You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2020/11/12 06:21:56 UTC
[ozone] branch master updated: HDDS-4417. Simplify Ozone client
code with configuration object (#1542)
This is an automated email from the ASF dual-hosted git repository.
shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new fdbd5da HDDS-4417. Simplify Ozone client code with configuration object (#1542)
fdbd5da is described below
commit fdbd5da0d02e6447c91d34da7bb1809356fec71e
Author: Elek, Márton <el...@users.noreply.github.com>
AuthorDate: Thu Nov 12 07:21:36 2020 +0100
HDDS-4417. Simplify Ozone client code with configuration object (#1542)
---
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 213 +++++++++++++++++++++
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 66 +++----
.../storage/TestBlockOutputStreamCorrectness.java | 17 +-
.../hdds/scm/storage/TestChunkInputStream.java | 11 +-
.../hdds/scm/storage/ContainerProtocolCalls.java | 4 +-
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 68 ++-----
.../org/apache/hadoop/ozone/common/Checksum.java | 22 +--
.../common/src/main/resources/ozone-default.xml | 81 --------
.../ratis/TestContainerCommandRequestMessage.java | 13 +-
.../hdds/conf/ConfigurationReflectionUtil.java | 14 ++
.../hadoop/hdds/conf/ConfigurationTarget.java | 4 +
.../ozone/container/ContainerTestHelper.java | 6 +-
.../common/impl/TestContainerPersistence.java | 53 ++---
.../ozone/client/io/BlockOutputStreamEntry.java | 135 +++----------
.../client/io/BlockOutputStreamEntryPool.java | 89 ++-------
.../hadoop/ozone/client/io/KeyOutputStream.java | 115 ++++-------
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 99 ++--------
.../apache/hadoop/ozone/MiniOzoneChaosCluster.java | 28 +--
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 49 +++--
.../ozone/client/rpc/TestBlockOutputStream.java | 34 ++--
.../rpc/TestBlockOutputStreamFlushDelay.java | 22 +--
.../rpc/TestBlockOutputStreamWithFailures.java | 39 ++--
...estBlockOutputStreamWithFailuresFlushDelay.java | 34 ++--
.../rpc/TestCloseContainerHandlingByClient.java | 35 ++--
.../hadoop/ozone/client/rpc/TestCommitWatcher.java | 7 +-
.../client/rpc/TestContainerStateMachine.java | 35 ++--
.../rpc/TestContainerStateMachineFailures.java | 64 +++----
.../client/rpc/TestDiscardPreallocatedBlocks.java | 35 ++--
.../client/rpc/TestFailureHandlingByClient.java | 7 +-
.../ozone/client/rpc/TestKeyInputStream.java | 33 ++--
.../rpc/TestOzoneClientRetriesOnException.java | 43 +++--
...estOzoneClientRetriesOnExceptionFlushDelay.java | 31 +--
.../ozone/client/rpc/TestOzoneRpcClient.java | 7 +-
.../client/rpc/TestOzoneRpcClientAbstract.java | 16 +-
.../client/rpc/TestOzoneRpcClientWithRatis.java | 2 +
.../client/rpc/TestValidateBCSIDOnRestart.java | 41 ++--
.../ozone/client/rpc/TestWatchForCommit.java | 46 +++--
.../hadoop/ozone/freon/DatanodeBlockPutter.java | 7 +-
38 files changed, 789 insertions(+), 836 deletions(-)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
new file mode 100644
index 0000000..2a79edb
--- /dev/null
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/OzoneClientConfig.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Configuration values for Ozone Client.
+ */
+@ConfigGroup(prefix = "ozone.client")
+public class OzoneClientConfig {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(OzoneClientConfig.class);
+
+ @Config(key = "stream.buffer.flush.size",
+ defaultValue = "16MB",
+ type = ConfigType.SIZE,
+ description = "Size which determines at what buffer position a partial "
+ + "flush will be initiated during write. It should be a multiple of"
+ + " ozone.client.stream.buffer.size",
+ tags = ConfigTag.CLIENT)
+ private long streamBufferFlushSize = 16 * 1024 * 1024;
+
+ @Config(key = "stream.buffer.size",
+ defaultValue = "4MB",
+ type = ConfigType.SIZE,
+ description = "The size of chunks the client will send to the server",
+ tags = ConfigTag.CLIENT)
+ private int streamBufferSize = 4 * 1024 * 1024;
+
+ @Config(key = "stream.buffer.flush.delay",
+ defaultValue = "true",
+ description = "Default true, when call flush() and determine whether "
+ + "the data in the current buffer is greater than ozone.client"
+ + ".stream.buffer.size, if greater than then send buffer to the "
+ + "datanode. You can turn this off by setting this configuration "
+ + "to false.", tags = ConfigTag.CLIENT)
+ private boolean streamBufferFlushDelay = true;
+
+ @Config(key = "stream.buffer.max.size",
+ defaultValue = "32MB",
+ type = ConfigType.SIZE,
+ description = "Size which determines at what buffer position write call"
+ + " be blocked till acknowledgement of the first partial flush "
+ + "happens by all servers.",
+ tags = ConfigTag.CLIENT)
+ private long streamBufferMaxSize = 32 * 1024 * 1024;
+
+ @Config(key = "max.retries",
+ defaultValue = "5",
+ description = "Maximum number of retries by Ozone Client on "
+ + "encountering exception while writing a key",
+ tags = ConfigTag.CLIENT)
+ private int maxRetryCount = 5;
+
+ @Config(key = "retry.interval",
+ defaultValue = "0",
+ description =
+ "Indicates the time duration a client will wait before retrying a "
+ + "write key request on encountering an exception. By default "
+ + "there is no wait",
+ tags = ConfigTag.CLIENT)
+ private int retryInterval = 0;
+
+ @Config(key = "checksum.type",
+ defaultValue = "CRC32",
+ description = "The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] "
+ + "determines which algorithm would be used to compute checksum for "
+ + "chunk data. Default checksum type is CRC32.",
+ tags = ConfigTag.CLIENT)
+ private String checksumType = ChecksumType.CRC32.name();
+
+ @Config(key = "bytes.per.checksum",
+ defaultValue = "1MB",
+ type = ConfigType.SIZE,
+ description = "Checksum will be computed for every bytes per checksum "
+ + "number of bytes and stored sequentially. The minimum value for "
+ + "this config is 256KB.",
+ tags = ConfigTag.CLIENT)
+ private int bytesPerChecksum = 1024 * 1024;
+
+ @Config(key = "verify.checksum",
+ defaultValue = "true",
+ description = "Ozone client to verify checksum of the checksum "
+ + "blocksize data.",
+ tags = ConfigTag.CLIENT)
+ private boolean checksumVerify = true;
+
+ public OzoneClientConfig() {
+ }
+
+ private void validate() {
+ Preconditions.checkState(streamBufferSize > 0);
+ Preconditions.checkState(streamBufferFlushSize > 0);
+ Preconditions.checkState(streamBufferMaxSize > 0);
+
+ Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0,
+ "expected max. buffer size (%s) to be a multiple of flush size (%s)",
+ streamBufferMaxSize, streamBufferFlushSize);
+ Preconditions.checkState(streamBufferFlushSize % streamBufferSize == 0,
+ "expected flush size (%s) to be a multiple of buffer size (%s)",
+ streamBufferFlushSize, streamBufferSize);
+
+ if (bytesPerChecksum <
+ OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
+ LOG.warn("The checksum size ({}) is not allowed to be less than the " +
+ "minimum size ({}), resetting to the minimum size.",
+ bytesPerChecksum,
+ OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
+ bytesPerChecksum =
+ OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
+ }
+
+ }
+
+ public long getStreamBufferFlushSize() {
+ return streamBufferFlushSize;
+ }
+
+ public void setStreamBufferFlushSize(long streamBufferFlushSize) {
+ this.streamBufferFlushSize = streamBufferFlushSize;
+ }
+
+ public int getStreamBufferSize() {
+ return streamBufferSize;
+ }
+
+ public void setStreamBufferSize(int streamBufferSize) {
+ this.streamBufferSize = streamBufferSize;
+ }
+
+ public boolean isStreamBufferFlushDelay() {
+ return streamBufferFlushDelay;
+ }
+
+ public void setStreamBufferFlushDelay(boolean streamBufferFlushDelay) {
+ this.streamBufferFlushDelay = streamBufferFlushDelay;
+ }
+
+ public long getStreamBufferMaxSize() {
+ return streamBufferMaxSize;
+ }
+
+ public void setStreamBufferMaxSize(long streamBufferMaxSize) {
+ this.streamBufferMaxSize = streamBufferMaxSize;
+ }
+
+ public int getMaxRetryCount() {
+ return maxRetryCount;
+ }
+
+ public void setMaxRetryCount(int maxRetryCount) {
+ this.maxRetryCount = maxRetryCount;
+ }
+
+ public int getRetryInterval() {
+ return retryInterval;
+ }
+
+ public void setRetryInterval(int retryInterval) {
+ this.retryInterval = retryInterval;
+ }
+
+ public ChecksumType getChecksumType() {
+ return ChecksumType.valueOf(checksumType);
+ }
+
+ public void setChecksumType(ChecksumType checksumType) {
+ this.checksumType = checksumType.name();
+ }
+
+ public int getBytesPerChecksum() {
+ return bytesPerChecksum;
+ }
+
+ public void setBytesPerChecksum(int bytesPerChecksum) {
+ this.bytesPerChecksum = bytesPerChecksum;
+ }
+
+ public boolean isChecksumVerify() {
+ return checksumVerify;
+ }
+
+ public void setChecksumVerify(boolean checksumVerify) {
+ this.checksumVerify = checksumVerify;
+ }
+
+}
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index e048708..e29bbe3 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -34,9 +34,9 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -85,13 +85,10 @@ public class BlockOutputStream extends OutputStream {
private final BlockData.Builder containerBlockData;
private XceiverClientFactory xceiverClientFactory;
private XceiverClientSpi xceiverClient;
- private final int bytesPerChecksum;
+ private OzoneClientConfig config;
+
private int chunkIndex;
private final AtomicLong chunkOffset = new AtomicLong();
- private final int streamBufferSize;
- private final long streamBufferFlushSize;
- private final boolean streamBufferFlushDelay;
- private final long streamBufferMaxSize;
private final BufferPool bufferPool;
// The IOException will be set by response handling thread in case there is an
// exception received in the response. If the exception is set, the next
@@ -133,46 +130,39 @@ public class BlockOutputStream extends OutputStream {
* Creates a new BlockOutputStream.
*
* @param blockID block ID
- * @param xceiverClientFactory client manager that controls client
+ * @param xceiverClientManager client manager that controls client
* @param pipeline pipeline where block will be written
* @param bufferPool pool of buffers
- * @param streamBufferFlushSize flush size
- * @param streamBufferMaxSize max size of the currentBuffer
- * @param checksumType checksum type
- * @param bytesPerChecksum Bytes per checksum
- * @param token a token for this block (may be null)
*/
- @SuppressWarnings("parameternumber")
- public BlockOutputStream(BlockID blockID,
- XceiverClientFactory xceiverClientFactory, Pipeline pipeline,
- int streamBufferSize, long streamBufferFlushSize,
- boolean streamBufferFlushDelay, long streamBufferMaxSize,
- BufferPool bufferPool, ChecksumType checksumType,
- int bytesPerChecksum, Token<? extends TokenIdentifier> token)
- throws IOException {
+ public BlockOutputStream(
+ BlockID blockID,
+ XceiverClientFactory xceiverClientManager,
+ Pipeline pipeline,
+ BufferPool bufferPool,
+ OzoneClientConfig config,
+ Token<? extends TokenIdentifier> token
+ ) throws IOException {
+ this.xceiverClientFactory = xceiverClientManager;
+ this.config = config;
this.blockID = new AtomicReference<>(blockID);
KeyValue keyValue =
KeyValue.newBuilder().setKey("TYPE").setValue("KEY").build();
this.containerBlockData =
BlockData.newBuilder().setBlockID(blockID.getDatanodeBlockIDProtobuf())
.addMetadata(keyValue);
- this.xceiverClientFactory = xceiverClientFactory;
- this.xceiverClient = xceiverClientFactory.acquireClient(pipeline);
- this.streamBufferSize = streamBufferSize;
- this.streamBufferFlushSize = streamBufferFlushSize;
- this.streamBufferMaxSize = streamBufferMaxSize;
- this.streamBufferFlushDelay = streamBufferFlushDelay;
+ this.xceiverClient = xceiverClientManager.acquireClient(pipeline);
this.bufferPool = bufferPool;
- this.bytesPerChecksum = bytesPerChecksum;
this.token = token;
//number of buffers used before doing a flush
refreshCurrentBuffer(bufferPool);
- flushPeriod = (int) (streamBufferFlushSize / streamBufferSize);
+ flushPeriod = (int) (config.getStreamBufferFlushSize() / config
+ .getStreamBufferSize());
Preconditions
.checkArgument(
- (long) flushPeriod * streamBufferSize == streamBufferFlushSize);
+ (long) flushPeriod * config.getStreamBufferSize() == config
+ .getStreamBufferFlushSize());
// A single thread executor handle the responses of async requests
responseExecutor = Executors.newSingleThreadExecutor();
@@ -182,7 +172,8 @@ public class BlockOutputStream extends OutputStream {
writtenDataLength = 0;
failedServers = new ArrayList<>(0);
ioException = new AtomicReference<>(null);
- checksum = new Checksum(checksumType, bytesPerChecksum);
+ checksum = new Checksum(config.getChecksumType(),
+ config.getBytesPerChecksum());
}
private void refreshCurrentBuffer(BufferPool pool) {
@@ -290,7 +281,7 @@ public class BlockOutputStream extends OutputStream {
private void allocateNewBufferIfNeeded() {
if (currentBufferRemaining == 0) {
- currentBuffer = bufferPool.allocateBuffer(bytesPerChecksum);
+ currentBuffer = bufferPool.allocateBuffer(config.getBytesPerChecksum());
currentBufferRemaining = currentBuffer.remaining();
}
}
@@ -300,7 +291,7 @@ public class BlockOutputStream extends OutputStream {
}
private boolean isBufferPoolFull() {
- return bufferPool.computeBufferData() == streamBufferMaxSize;
+ return bufferPool.computeBufferData() == config.getStreamBufferMaxSize();
}
/**
@@ -318,7 +309,7 @@ public class BlockOutputStream extends OutputStream {
if (LOG.isDebugEnabled()) {
LOG.debug("Retrying write length {} for blockID {}", len, blockID);
}
- Preconditions.checkArgument(len <= streamBufferMaxSize);
+ Preconditions.checkArgument(len <= config.getStreamBufferMaxSize());
int count = 0;
while (len > 0) {
ChunkBuffer buffer = bufferPool.getBuffer(count);
@@ -334,13 +325,13 @@ public class BlockOutputStream extends OutputStream {
// the buffer. We should just validate
// if we wrote data of size streamBufferMaxSize/streamBufferFlushSize to
// call for handling full buffer/flush buffer condition.
- if (writtenDataLength % streamBufferFlushSize == 0) {
+ if (writtenDataLength % config.getStreamBufferFlushSize() == 0) {
// reset the position to zero as now we will be reading the
// next buffer in the list
updateFlushLength();
executePutBlock(false, false);
}
- if (writtenDataLength == streamBufferMaxSize) {
+ if (writtenDataLength == config.getStreamBufferMaxSize()) {
handleFullBuffer();
}
}
@@ -486,8 +477,9 @@ public class BlockOutputStream extends OutputStream {
public void flush() throws IOException {
if (xceiverClientFactory != null && xceiverClient != null
&& bufferPool != null && bufferPool.getSize() > 0
- && (!streamBufferFlushDelay ||
- writtenDataLength - totalDataFlushedLength >= streamBufferSize)) {
+ && (!config.isStreamBufferFlushDelay() ||
+ writtenDataLength - totalDataFlushedLength
+ >= config.getStreamBufferSize())) {
try {
handleFlush(false);
} catch (ExecutionException e) {
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
index 545c588..71d04a0 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestBlockOutputStreamCorrectness.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRe
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientReply;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -97,17 +98,21 @@ public class TestBlockOutputStreamCorrectness {
Mockito.when(xcm.acquireClient(Mockito.any()))
.thenReturn(new MockXceiverClientSpi(pipeline));
+ OzoneClientConfig config = new OzoneClientConfig();
+ config.setStreamBufferSize(4 * 1024 * 1024);
+ config.setStreamBufferMaxSize(32 * 1024 * 1024);
+ config.setStreamBufferFlushDelay(true);
+ config.setStreamBufferFlushSize(16 * 1024 * 1024);
+ config.setChecksumType(ChecksumType.NONE);
+ config.setBytesPerChecksum(256 * 1024);
+
BlockOutputStream outputStream = new BlockOutputStream(
new BlockID(1L, 1L),
xcm,
pipeline,
- 4 * 1024 * 1024,
- 16 * 1024 * 1024,
- true,
- 32 * 1024 * 1024,
bufferPool,
- ChecksumType.NONE,
- 256 * 1024, null);
+ config,
+ null);
return outputStream;
}
diff --git a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
index 94ec157..eea8e1f 100644
--- a/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
+++ b/hadoop-hdds/client/src/test/java/org/apache/hadoop/hdds/scm/storage/TestChunkInputStream.java
@@ -18,9 +18,11 @@
package org.apache.hadoop.hdds.scm.storage;
+import java.io.EOFException;
+import java.util.Random;
+
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.test.GenericTestUtils;
@@ -28,9 +30,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import java.io.EOFException;
-import java.util.Random;
-
/**
* Tests for {@link ChunkInputStream}'s functionality.
*/
@@ -48,9 +47,7 @@ public class TestChunkInputStream {
@Before
public void setup() throws Exception {
- checksum = new Checksum(ChecksumType.valueOf(
- OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT),
- BYTES_PER_CHECKSUM);
+ checksum = new Checksum(ChecksumType.CRC32, BYTES_PER_CHECKSUM);
chunkData = generateRandomData(CHUNK_SIZE);
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
index eecc975..f681d0d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/ContainerProtocolCalls.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.CloseContainerRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
@@ -330,7 +331,8 @@ public final class ContainerProtocolCalls {
KeyValue keyValue =
KeyValue.newBuilder().setKey("OverWriteRequested").setValue("true")
.build();
- Checksum checksum = new Checksum();
+
+ Checksum checksum = new Checksum(ChecksumType.CRC32, 256);
final ChecksumData checksumData = checksum.computeChecksum(data);
ChunkInfo chunk =
ChunkInfo.newBuilder()
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 482ac88..a448e1a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.ozone;
-import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.annotation.InterfaceStability;
import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -117,43 +115,6 @@ public final class OzoneConfigKeys {
* */
public static final String OZONE_ADMINISTRATORS_WILDCARD = "*";
- public static final String OZONE_CLIENT_STREAM_BUFFER_SIZE =
- "ozone.client.stream.buffer.size";
-
- public static final String OZONE_CLIENT_STREAM_BUFFER_SIZE_DEFAULT =
- "4MB";
-
- public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE =
- "ozone.client.stream.buffer.flush.size";
-
- public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT =
- "16MB";
-
- public static final String OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE =
- "ozone.client.stream.buffer.max.size";
-
- public static final String OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT =
- "32MB";
-
- public static final String OZONE_CLIENT_MAX_RETRIES =
- "ozone.client.max.retries";
- public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 5;
- public static final String OZONE_CLIENT_RETRY_INTERVAL =
- "ozone.client.retry.interval";
- public static final TimeDuration OZONE_CLIENT_RETRY_INTERVAL_DEFAULT =
- TimeDuration.valueOf(0, TimeUnit.MILLISECONDS);
-
- /**
- * If this value is true, when the client calls the flush() method,
- * it checks whether the data in the buffer is greater than
- * OZONE_CLIENT_STREAM_BUFFER_SIZE_DEFAULT. If greater than,
- * send the data in the buffer to the datanode.
- * */
- public static final String OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY =
- "ozone.client.stream.buffer.flush.delay";
- public static final boolean OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT =
- true;
-
// This defines the overall connection limit for the connection pool used in
// RestClient.
public static final String OZONE_REST_CLIENT_HTTP_CONNECTION_MAX =
@@ -354,21 +315,9 @@ public final class OzoneConfigKeys {
public static final String OZONE_CONTAINER_COPY_WORKDIR =
"hdds.datanode.replication.work.dir";
- /**
- * Config properties to set client side checksum properties.
- */
- public static final String OZONE_CLIENT_CHECKSUM_TYPE =
- "ozone.client.checksum.type";
- public static final String OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT = "CRC32";
- public static final String OZONE_CLIENT_BYTES_PER_CHECKSUM =
- "ozone.client.bytes.per.checksum";
- public static final String OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT = "1MB";
- public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES =
- 1024 * 1024;
+
public static final int OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE = 256 * 1024;
- public static final String OZONE_CLIENT_VERIFY_CHECKSUM =
- "ozone.client.verify.checksum";
- public static final boolean OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT = true;
+
public static final String OZONE_CLIENT_READ_TIMEOUT
= "ozone.client.read.timeout";
public static final String OZONE_CLIENT_READ_TIMEOUT_DEFAULT = "30s";
@@ -459,16 +408,21 @@ public final class OzoneConfigKeys {
"ssl.server.keystore.location";
public static final String OZONE_SERVER_HTTPS_TRUSTSTORE_LOCATION_KEY =
"ssl.server.truststore.location";
- public static final String OZONE_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY =
+ public static final String OZONE_SERVER_HTTPS_TRUSTSTORE_PASSWORD_KEY =
"ssl.server.truststore.password";
- public static final String OZONE_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY =
+ public static final String OZONE_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY =
"ozone.https.client.keystore.resource";
- public static final String OZONE_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT =
+ public static final String OZONE_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT =
"ssl-client.xml";
- public static final String OZONE_CLIENT_HTTPS_NEED_AUTH_KEY =
+ public static final String OZONE_CLIENT_HTTPS_NEED_AUTH_KEY =
"ozone.https.client.need-auth";
public static final boolean OZONE_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false;
+ public static final String OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_KEY =
+ "ozone.om.keyname.character.check.enabled";
+ public static final boolean OZONE_OM_KEYNAME_CHARACTER_CHECK_ENABLED_DEFAULT =
+ false;
+
/**
* There is no need to instantiate this class.
*/
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
index 98974ee..d86f7b1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Checksum.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.ozone.common;
-import com.google.common.annotations.VisibleForTesting;
-
import java.nio.ByteBuffer;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
@@ -27,11 +25,11 @@ import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
-import com.google.common.primitives.Ints;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
- .ChecksumType;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.primitives.Ints;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -111,18 +109,6 @@ public class Checksum {
}
/**
- * Constructs a Checksum object with default ChecksumType and default
- * BytesPerChecksum.
- */
- @VisibleForTesting
- public Checksum() {
- this.checksumType = ChecksumType.valueOf(
- OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
- this.bytesPerChecksum = OzoneConfigKeys
- .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
- }
-
- /**
* Computes checksum for give data.
* @param data input data.
* @return ChecksumData computed for input data.
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 4853978..66fc8ca 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -386,59 +386,6 @@
</description>
</property>
<property>
- <name>ozone.client.stream.buffer.flush.delay</name>
- <value>true</value>
- <tag>OZONE, CLIENT</tag>
- <description>
- Default true, when call flush() and determine whether the data in the
- current buffer is greater than ozone.client.stream.buffer.size, if
- greater than then send buffer to the datanode. You can turn this off
- by setting this configuration to false.
- </description>
- </property>
- <property>
- <name>ozone.client.stream.buffer.size</name>
- <value>4MB</value>
- <tag>OZONE, CLIENT</tag>
- <description>The size of chunks the client will send to the server.
- </description>
- </property>
- <property>
- <name>ozone.client.stream.buffer.flush.size</name>
- <value>16MB</value>
- <tag>OZONE, CLIENT</tag>
- <description>Size which determines at what buffer position a partial
- flush will be initiated during write. It should be a multiple
- of ozone.client.stream.buffer.size.
- </description>
- </property>
- <property>
- <name>ozone.client.stream.buffer.max.size</name>
- <value>32MB</value>
- <tag>OZONE, CLIENT</tag>
- <description>Size which determines at what buffer position
- write call be blocked till acknowledgement of the first partial flush
- happens by all servers.
- </description>
- </property>
- <property>
- <name>ozone.client.max.retries</name>
- <value>5</value>
- <tag>OZONE, CLIENT</tag>
- <description>Maximum number of retries by Ozone Client on encountering
- exception while writing a key.
- </description>
- </property>
- <property>
- <name>ozone.client.retry.interval</name>
- <value>0ms</value>
- <tag>OZONE, CLIENT</tag>
- <description>Indicates the time duration a client will wait before
- retrying a write key request on encountering an exception. By default
- there is no wait.
- </description>
- </property>
- <property>
<name>ozone.client.socket.timeout</name>
<value>5000ms</value>
<tag>OZONE, CLIENT</tag>
@@ -1543,34 +1490,6 @@
</description>
</property>
- <property>
- <name>ozone.client.checksum.type</name>
- <value>CRC32</value>
- <tag>OZONE, CLIENT, MANAGEMENT</tag>
- <description>The checksum type [NONE/ CRC32/ CRC32C/ SHA256/ MD5] determines
- which algorithm would be used to compute checksum for chunk data.
- Default checksum type is CRC32.
- </description>
- </property>
-
- <property>
- <name>ozone.client.bytes.per.checksum</name>
- <value>1MB</value>
- <tag>OZONE, CLIENT, MANAGEMENT</tag>
- <description>Checksum will be computed for every bytes per checksum number
- of bytes and stored sequentially. The minimum value for this config is
- 256KB.
- </description>
- </property>
-
- <property>
- <name>ozone.client.verify.checksum</name>
- <value>true</value>
- <tag>OZONE, CLIENT, MANAGEMENT</tag>
- <description>
- Ozone client to verify checksum of the checksum blocksize data.
- </description>
- </property>
<property>
<name>ozone.client.read.timeout</name>
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java
index 6d25cf8..469faac 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java
@@ -17,8 +17,13 @@
*/
package org.apache.hadoop.hdds.ratis;
+import java.util.Random;
+import java.util.UUID;
+import java.util.function.BiFunction;
+
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.KeyValue;
@@ -29,14 +34,11 @@ import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.WriteChunk
import org.apache.hadoop.ozone.common.Checksum;
import org.apache.hadoop.ozone.common.ChecksumData;
import org.apache.hadoop.ozone.common.OzoneChecksumException;
+
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.junit.Assert;
import org.junit.Test;
-import java.util.Random;
-import java.util.UUID;
-import java.util.function.BiFunction;
-
/** Testing {@link ContainerCommandRequestMessage}. */
public class TestContainerCommandRequestMessage {
static final Random RANDOM = new Random();
@@ -51,7 +53,8 @@ public class TestContainerCommandRequestMessage {
static ChecksumData checksum(ByteString data) {
try {
- return new Checksum().computeChecksum(data.asReadOnlyByteBuffer());
+ return new Checksum(ChecksumType.CRC32, 1024 * 1024)
+ .computeChecksum(data.asReadOnlyByteBuffer());
} catch (OzoneChecksumException e) {
throw new IllegalStateException(e);
}
diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationReflectionUtil.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationReflectionUtil.java
index 653300e..229390e 100644
--- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationReflectionUtil.java
+++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationReflectionUtil.java
@@ -94,6 +94,16 @@ public final class ConfigurationReflectionUtil {
forcedFieldSet(field, configuration,
from.getTimeDuration(key, "0s", configAnnotation.timeUnit()));
break;
+ case SIZE:
+ final long value =
+ Math.round(from.getStorageSize(key, "0b", StorageUnit.BYTES));
+ if (field.getType() == int.class) {
+ forcedFieldSet(field, configuration, (int) value);
+ } else {
+ forcedFieldSet(field, configuration, value);
+
+ }
+ break;
case CLASS:
forcedFieldSet(field, configuration,
from.getClass(key, Object.class));
@@ -233,6 +243,10 @@ public final class ConfigurationReflectionUtil {
config.setTimeDuration(key, field.getLong(configObject),
configAnnotation.timeUnit());
break;
+ case SIZE:
+ config.setStorageSize(key, field.getLong(configObject),
+ StorageUnit.BYTES);
+ break;
case CLASS:
Object valueClass = field.get(configObject);
if (valueClass instanceof Class<?>) {
diff --git a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java
index 12903cf..acc6614 100644
--- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java
+++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationTarget.java
@@ -44,6 +44,10 @@ public interface ConfigurationTarget {
set(name, value + ParsedTimeDuration.unitFor(unit).suffix());
}
+ default void setStorageSize(String name, long value, StorageUnit unit) {
+ set(name, value + unit.getShortName());
+ }
+
default <T> void setFromObject(T object) {
ConfigGroup configGroup =
object.getClass().getAnnotation(ConfigGroup.class);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index b8ebaec..5b25dba 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -25,11 +25,11 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
-import com.google.common.base.Strings;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsUtils;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto.Builder;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandResponseProto;
@@ -49,6 +49,7 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverSe
import org.apache.hadoop.security.token.Token;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
@@ -109,7 +110,8 @@ public final class ContainerTestHelper {
*/
public static void setDataChecksum(ChunkInfo info, ChunkBuffer data)
throws OzoneChecksumException {
- Checksum checksum = new Checksum();
+ Checksum checksum = new Checksum(ChecksumType.CRC32,
+ 1024 * 1024);
info.setChecksumData(checksum.computeChecksum(data));
data.rewind();
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
index 785ed8b..22fa88f 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerPersistence.java
@@ -17,11 +17,23 @@
package org.apache.hadoop.ozone.container.common.impl;
-import com.google.common.collect.Maps;
-import org.apache.commons.io.FileUtils;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
@@ -37,22 +49,31 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.interfaces.Container;
import org.apache.hadoop.ozone.container.common.interfaces.VolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
+import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.common.volume.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
-import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.keyvalue.impl.BlockManagerImpl;
import org.apache.hadoop.ozone.container.keyvalue.impl.ChunkManagerFactory;
-import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.ozone.container.keyvalue.interfaces.BlockManager;
+import org.apache.hadoop.ozone.container.keyvalue.interfaces.ChunkManager;
import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.ozone.container.common.utils.ReferenceCountedDB;
+
+import com.google.common.collect.Maps;
+import org.apache.commons.io.FileUtils;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
+import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -64,26 +85,6 @@ import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.security.NoSuchAlgorithmException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BCSID_MISMATCH;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNKNOWN_BCSID;
-import static org.apache.hadoop.ozone.container.ContainerTestHelper.getChunk;
-import static org.apache.hadoop.ozone.container.ContainerTestHelper.getData;
-import static org.apache.hadoop.ozone.container.ContainerTestHelper.setDataChecksum;
-import static org.junit.Assert.fail;
-
/**
* Simple tests to verify that container persistence works as expected. Some of
* these tests are specific to {@link KeyValueContainer}. If a new {@link
@@ -423,7 +424,7 @@ public class TestContainerPersistence {
Path dataDir = Paths.get(cNewData.getChunksPath());
// Read chunk via file system and verify.
- Checksum checksum = new Checksum();
+ Checksum checksum = new Checksum(ChecksumType.CRC32, 1024 * 1024);
// Read chunk via ReadChunk call.
for (int x = 0; x < chunkCount; x++) {
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 81bac07..8e90c54 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -24,12 +24,13 @@ import java.util.Collections;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import com.google.common.annotations.VisibleForTesting;
@@ -39,53 +40,40 @@ import com.google.common.annotations.VisibleForTesting;
* */
public final class BlockOutputStreamEntry extends OutputStream {
+ private final OzoneClientConfig config;
private OutputStream outputStream;
private BlockID blockID;
private final String key;
private final XceiverClientFactory xceiverClientManager;
private final Pipeline pipeline;
- private final ChecksumType checksumType;
- private final int bytesPerChecksum;
- private final int chunkSize;
// total number of bytes that should be written to this stream
private final long length;
// the current position of this stream 0 <= currentPosition < length
private long currentPosition;
private final Token<OzoneBlockTokenIdentifier> token;
- private final int streamBufferSize;
- private final long streamBufferFlushSize;
- private final boolean streamBufferFlushDelay;
- private final long streamBufferMaxSize;
- private final long watchTimeout;
private BufferPool bufferPool;
@SuppressWarnings({"parameternumber", "squid:S00107"})
- private BlockOutputStreamEntry(BlockID blockID, String key,
+ private BlockOutputStreamEntry(
+ BlockID blockID, String key,
XceiverClientFactory xceiverClientManager,
- Pipeline pipeline, String requestId, int chunkSize,
- long length, int streamBufferSize, long streamBufferFlushSize,
- boolean streamBufferFlushDelay, long streamBufferMaxSize,
- long watchTimeout, BufferPool bufferPool,
- ChecksumType checksumType, int bytesPerChecksum,
- Token<OzoneBlockTokenIdentifier> token) {
+ Pipeline pipeline,
+ long length,
+ BufferPool bufferPool,
+ Token<OzoneBlockTokenIdentifier> token,
+ OzoneClientConfig config
+ ) {
+ this.config = config;
this.outputStream = null;
this.blockID = blockID;
this.key = key;
this.xceiverClientManager = xceiverClientManager;
this.pipeline = pipeline;
- this.chunkSize = chunkSize;
this.token = token;
this.length = length;
this.currentPosition = 0;
- this.streamBufferSize = streamBufferSize;
- this.streamBufferFlushSize = streamBufferFlushSize;
- this.streamBufferFlushDelay = streamBufferFlushDelay;
- this.streamBufferMaxSize = streamBufferMaxSize;
- this.watchTimeout = watchTimeout;
this.bufferPool = bufferPool;
- this.checksumType = checksumType;
- this.bytesPerChecksum = bytesPerChecksum;
}
long getLength() {
@@ -108,11 +96,12 @@ public final class BlockOutputStreamEntry extends OutputStream {
*/
private void checkStream() throws IOException {
if (this.outputStream == null) {
+ if (getToken() != null) {
+ UserGroupInformation.getCurrentUser().addToken(getToken());
+ }
this.outputStream =
new BlockOutputStream(blockID, xceiverClientManager,
- pipeline, streamBufferSize, streamBufferFlushSize,
- streamBufferFlushDelay, streamBufferMaxSize, bufferPool,
- checksumType, bytesPerChecksum, token);
+ pipeline, bufferPool, config, token);
}
}
@@ -212,28 +201,10 @@ public final class BlockOutputStreamEntry extends OutputStream {
private String key;
private XceiverClientFactory xceiverClientManager;
private Pipeline pipeline;
- private String requestId;
- private int chunkSize;
private long length;
- private int streamBufferSize;
- private long streamBufferFlushSize;
- private boolean streamBufferFlushDelay;
- private long streamBufferMaxSize;
- private long watchTimeout;
private BufferPool bufferPool;
private Token<OzoneBlockTokenIdentifier> token;
- private ChecksumType checksumType;
- private int bytesPerChecksum;
-
- public Builder setChecksumType(ChecksumType type) {
- this.checksumType = type;
- return this;
- }
-
- public Builder setBytesPerChecksum(int bytes) {
- this.bytesPerChecksum = bytes;
- return this;
- }
+ private OzoneClientConfig config;
public Builder setBlockID(BlockID bID) {
this.blockID = bID;
@@ -257,48 +228,20 @@ public final class BlockOutputStreamEntry extends OutputStream {
return this;
}
- public Builder setRequestId(String request) {
- this.requestId = request;
- return this;
- }
-
- public Builder setChunkSize(int cSize) {
- this.chunkSize = cSize;
- return this;
- }
public Builder setLength(long len) {
this.length = len;
return this;
}
- public Builder setStreamBufferSize(int bufferSize) {
- this.streamBufferSize = bufferSize;
- return this;
- }
-
- public Builder setStreamBufferFlushSize(long bufferFlushSize) {
- this.streamBufferFlushSize = bufferFlushSize;
- return this;
- }
-
- public Builder setStreamBufferFlushDelay(boolean bufferFlushDelay) {
- this.streamBufferFlushDelay = bufferFlushDelay;
- return this;
- }
-
- public Builder setStreamBufferMaxSize(long bufferMaxSize) {
- this.streamBufferMaxSize = bufferMaxSize;
- return this;
- }
- public Builder setWatchTimeout(long timeout) {
- this.watchTimeout = timeout;
+ public Builder setBufferPool(BufferPool pool) {
+ this.bufferPool = pool;
return this;
}
- public Builder setbufferPool(BufferPool pool) {
- this.bufferPool = pool;
+ public Builder setConfig(OzoneClientConfig clientConfig) {
+ this.config = clientConfig;
return this;
}
@@ -308,11 +251,13 @@ public final class BlockOutputStreamEntry extends OutputStream {
}
public BlockOutputStreamEntry build() {
- return new BlockOutputStreamEntry(blockID, key,
- xceiverClientManager, pipeline, requestId, chunkSize,
- length, streamBufferSize, streamBufferFlushSize,
- streamBufferFlushDelay, streamBufferMaxSize, watchTimeout,
- bufferPool, checksumType, bytesPerChecksum, token);
+ return new BlockOutputStreamEntry(blockID,
+ key,
+ xceiverClientManager,
+ pipeline,
+ length,
+ bufferPool,
+ token, config);
}
}
@@ -337,34 +282,10 @@ public final class BlockOutputStreamEntry extends OutputStream {
return pipeline;
}
- public int getChunkSize() {
- return chunkSize;
- }
-
public long getCurrentPosition() {
return currentPosition;
}
- public int getStreamBufferSize() {
- return streamBufferSize;
- }
-
- public long getStreamBufferFlushSize() {
- return streamBufferFlushSize;
- }
-
- public boolean getStreamBufferFlushDelay() {
- return streamBufferFlushDelay;
- }
-
- public long getStreamBufferMaxSize() {
- return streamBufferMaxSize;
- }
-
- public long getWatchTimeout() {
- return watchTimeout;
- }
-
public BufferPool getBufferPool() {
return bufferPool;
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
index c9decc3..cf7e841 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntryPool.java
@@ -23,14 +23,14 @@ import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ByteStringConversion;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.storage.BufferPool;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -53,20 +53,12 @@ public class BlockOutputStreamEntryPool {
LoggerFactory.getLogger(BlockOutputStreamEntryPool.class);
private final List<BlockOutputStreamEntry> streamEntries;
+ private final OzoneClientConfig config;
private int currentStreamIndex;
private final OzoneManagerProtocol omClient;
private final OmKeyArgs keyArgs;
private final XceiverClientFactory xceiverClientFactory;
- private final int chunkSize;
private final String requestID;
- private final int streamBufferSize;
- private final long streamBufferFlushSize;
- private final boolean streamBufferFlushDelay;
- private final long streamBufferMaxSize;
- private final long watchTimeout;
- private final long blockSize;
- private final int bytesPerChecksum;
- private final ContainerProtos.ChecksumType checksumType;
private final BufferPool bufferPool;
private OmMultipartCommitUploadPartInfo commitUploadPartInfo;
private final long openID;
@@ -74,17 +66,17 @@ public class BlockOutputStreamEntryPool {
@SuppressWarnings({"parameternumber", "squid:S00107"})
public BlockOutputStreamEntryPool(
+ OzoneClientConfig config,
OzoneManagerProtocol omClient,
- int chunkSize, String requestId, HddsProtos.ReplicationFactor factor,
+ String requestId, HddsProtos.ReplicationFactor factor,
HddsProtos.ReplicationType type,
- int bufferSize, long bufferFlushSize,
- boolean bufferFlushDelay, long bufferMaxSize,
- long size, long watchTimeout, ContainerProtos.ChecksumType checksumType,
- int bytesPerChecksum, String uploadID, int partNumber,
+ String uploadID, int partNumber,
boolean isMultipart, OmKeyInfo info,
boolean unsafeByteBufferConversion,
XceiverClientFactory xceiverClientFactory, long openID
) {
+ this.config = config;
+ this.xceiverClientFactory = xceiverClientFactory;
streamEntries = new ArrayList<>();
currentStreamIndex = 0;
this.omClient = omClient;
@@ -93,38 +85,14 @@ public class BlockOutputStreamEntryPool {
.setType(type).setFactor(factor).setDataSize(info.getDataSize())
.setIsMultipartKey(isMultipart).setMultipartUploadID(uploadID)
.setMultipartUploadPartNumber(partNumber).build();
- this.xceiverClientFactory = xceiverClientFactory;
- this.chunkSize = chunkSize;
this.requestID = requestId;
- this.streamBufferSize = bufferSize;
- this.streamBufferFlushSize = bufferFlushSize;
- this.streamBufferFlushDelay = bufferFlushDelay;
- this.streamBufferMaxSize = bufferMaxSize;
- this.blockSize = size;
- this.watchTimeout = watchTimeout;
- this.bytesPerChecksum = bytesPerChecksum;
- this.checksumType = checksumType;
this.openID = openID;
this.excludeList = new ExcludeList();
- Preconditions.checkState(chunkSize > 0);
- Preconditions.checkState(streamBufferSize > 0);
- Preconditions.checkState(streamBufferFlushSize > 0);
- Preconditions.checkState(streamBufferMaxSize > 0);
- Preconditions.checkState(blockSize > 0);
- Preconditions.checkState(blockSize >= streamBufferMaxSize);
- Preconditions.checkState(streamBufferMaxSize % streamBufferFlushSize == 0,
- "expected max. buffer size (%s) to be a multiple of flush size (%s)",
- streamBufferMaxSize, streamBufferFlushSize);
- Preconditions.checkState(streamBufferFlushSize % streamBufferSize == 0,
- "expected flush size (%s) to be a multiple of buffer size (%s)",
- streamBufferFlushSize, streamBufferSize);
- Preconditions.checkState(chunkSize % streamBufferSize == 0,
- "expected chunk size (%s) to be a multiple of buffer size (%s)",
- chunkSize, streamBufferSize);
this.bufferPool =
- new BufferPool(streamBufferSize,
- (int) (streamBufferMaxSize / streamBufferSize),
+ new BufferPool(config.getStreamBufferSize(),
+ (int) (config.getStreamBufferMaxSize() / config
+ .getStreamBufferSize()),
ByteStringConversion
.createByteBufferConversion(unsafeByteBufferConversion));
}
@@ -140,19 +108,16 @@ public class BlockOutputStreamEntryPool {
omClient = null;
keyArgs = null;
xceiverClientFactory = null;
- chunkSize = 0;
+ config =
+ new OzoneConfiguration().getObject(OzoneClientConfig.class);
+ config.setStreamBufferSize(0);
+ config.setStreamBufferMaxSize(0);
+ config.setStreamBufferFlushSize(0);
+ config.setStreamBufferFlushDelay(false);
requestID = null;
- streamBufferSize = 0;
- streamBufferFlushSize = 0;
- streamBufferFlushDelay = false;
- streamBufferMaxSize = 0;
+ int chunkSize = 0;
bufferPool = new BufferPool(chunkSize, 1);
- watchTimeout = 0;
- blockSize = 0;
- this.checksumType = ContainerProtos.ChecksumType.valueOf(
- OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
- this.bytesPerChecksum = OzoneConfigKeys
- .OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT_BYTES; // Default is 1MB
+
currentStreamIndex = 0;
openID = -1;
excludeList = new ExcludeList();
@@ -189,17 +154,9 @@ public class BlockOutputStreamEntryPool {
.setKey(keyArgs.getKeyName())
.setXceiverClientManager(xceiverClientFactory)
.setPipeline(subKeyInfo.getPipeline())
- .setRequestId(requestID)
- .setChunkSize(chunkSize)
+ .setConfig(config)
.setLength(subKeyInfo.getLength())
- .setStreamBufferSize(streamBufferSize)
- .setStreamBufferFlushSize(streamBufferFlushSize)
- .setStreamBufferFlushDelay(streamBufferFlushDelay)
- .setStreamBufferMaxSize(streamBufferMaxSize)
- .setWatchTimeout(watchTimeout)
- .setbufferPool(bufferPool)
- .setChecksumType(checksumType)
- .setBytesPerChecksum(bytesPerChecksum)
+ .setBufferPool(bufferPool)
.setToken(subKeyInfo.getToken());
streamEntries.add(builder.build());
}
@@ -363,10 +320,6 @@ public class BlockOutputStreamEntryPool {
return excludeList;
}
- public long getStreamBufferMaxSize() {
- return streamBufferMaxSize;
- }
-
boolean isEmpty() {
return streamEntries.isEmpty();
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 03cdb72..b2a4e92 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -29,9 +29,9 @@ import java.util.stream.Collectors;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientFactory;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -67,6 +67,8 @@ import org.slf4j.LoggerFactory;
*/
public class KeyOutputStream extends OutputStream {
+ private OzoneClientConfig config;
+
/**
* Defines stream action while calling handleFlushOrClose.
*/
@@ -126,29 +128,33 @@ public class KeyOutputStream extends OutputStream {
}
@SuppressWarnings({"parameternumber", "squid:S00107"})
- public KeyOutputStream(OpenKeySession handler,
+ public KeyOutputStream(
+ OzoneClientConfig config,
+ OpenKeySession handler,
XceiverClientFactory xceiverClientManager,
OzoneManagerProtocol omClient, int chunkSize,
String requestId, ReplicationFactor factor, ReplicationType type,
- int bufferSize, long bufferFlushSize, boolean isBufferFlushDelay,
- long bufferMaxSize, long size, long watchTimeout,
- ChecksumType checksumType, int bytesPerChecksum,
String uploadID, int partNumber, boolean isMultipart,
- int maxRetryCount, long retryInterval,
- boolean unsafeByteBufferConversion) {
+ boolean unsafeByteBufferConversion
+ ) {
+ this.config = config;
OmKeyInfo info = handler.getKeyInfo();
blockOutputStreamEntryPool =
- new BlockOutputStreamEntryPool(omClient, chunkSize, requestId, factor,
- type, bufferSize, bufferFlushSize, isBufferFlushDelay,
- bufferMaxSize, size,
- watchTimeout, checksumType, bytesPerChecksum, uploadID, partNumber,
- isMultipart, info, unsafeByteBufferConversion,
- xceiverClientManager, handler.getId());
+ new BlockOutputStreamEntryPool(
+ config,
+ omClient,
+ requestId, factor, type,
+ uploadID, partNumber,
+ isMultipart, info,
+ unsafeByteBufferConversion,
+ xceiverClientManager,
+ handler.getId());
+
// Retrieve the file encryption key info, null if file is not in
// encrypted bucket.
this.feInfo = info.getFileEncryptionInfo();
this.retryPolicyMap = HddsClientUtils.getRetryPolicyByException(
- maxRetryCount, retryInterval);
+ config.getMaxRetryCount(), config.getRetryInterval());
this.retryCount = 0;
this.isException = false;
this.writeOffset = 0;
@@ -258,8 +264,8 @@ public class KeyOutputStream extends OutputStream {
// to or less than the max length of the buffer allocated.
// The len specified here is the combined sum of the data length of
// the buffers
- Preconditions.checkState(!retry || len <= blockOutputStreamEntryPool
- .getStreamBufferMaxSize());
+ Preconditions.checkState(!retry || len <= config
+ .getStreamBufferMaxSize());
int dataWritten = (int) (current.getWrittenDataLength() - currentPos);
writeLen = retry ? (int) len : dataWritten;
// In retry path, the data written is already accounted in offset.
@@ -310,7 +316,7 @@ public class KeyOutputStream extends OutputStream {
pipeline, totalSuccessfulFlushedData, bufferedDataLen, retryCount);
}
Preconditions.checkArgument(
- bufferedDataLen <= blockOutputStreamEntryPool.getStreamBufferMaxSize());
+ bufferedDataLen <= config.getStreamBufferMaxSize());
Preconditions.checkArgument(
offset - blockOutputStreamEntryPool.getKeyLength() == bufferedDataLen);
long containerId = streamEntry.getBlockID().getContainerID();
@@ -549,20 +555,11 @@ public class KeyOutputStream extends OutputStream {
private String requestID;
private ReplicationType type;
private ReplicationFactor factor;
- private int streamBufferSize;
- private long streamBufferFlushSize;
- private boolean streamBufferFlushDelay;
- private long streamBufferMaxSize;
- private long blockSize;
- private long watchTimeout;
- private ChecksumType checksumType;
- private int bytesPerChecksum;
private String multipartUploadID;
private int multipartNumber;
private boolean isMultipartKey;
- private int maxRetryCount;
- private long retryInterval;
private boolean unsafeByteBufferConversion;
+ private OzoneClientConfig clientConfig;
public Builder setMultipartUploadID(String uploadID) {
this.multipartUploadID = uploadID;
@@ -609,53 +606,13 @@ public class KeyOutputStream extends OutputStream {
return this;
}
- public Builder setStreamBufferSize(int size) {
- this.streamBufferSize = size;
- return this;
- }
-
- public Builder setStreamBufferFlushSize(long size) {
- this.streamBufferFlushSize = size;
- return this;
- }
-
- public Builder setStreamBufferFlushDelay(boolean isDelay) {
- this.streamBufferFlushDelay = isDelay;
- return this;
- }
-
- public Builder setStreamBufferMaxSize(long size) {
- this.streamBufferMaxSize = size;
- return this;
- }
-
- public Builder setBlockSize(long size) {
- this.blockSize = size;
- return this;
- }
-
- public Builder setChecksumType(ChecksumType cType) {
- this.checksumType = cType;
- return this;
- }
-
- public Builder setBytesPerChecksum(int bytes) {
- this.bytesPerChecksum = bytes;
- return this;
- }
-
public Builder setIsMultipartKey(boolean isMultipart) {
this.isMultipartKey = isMultipart;
return this;
}
- public Builder setMaxRetryCount(int maxCount) {
- this.maxRetryCount = maxCount;
- return this;
- }
-
- public Builder setRetryInterval(long retryIntervalInMS) {
- this.retryInterval = retryIntervalInMS;
+ public Builder setConfig(OzoneClientConfig config) {
+ this.clientConfig = config;
return this;
}
@@ -665,13 +622,19 @@ public class KeyOutputStream extends OutputStream {
}
public KeyOutputStream build() {
- return new KeyOutputStream(openHandler, xceiverManager, omClient,
- chunkSize, requestID, factor, type,
- streamBufferSize, streamBufferFlushSize, streamBufferFlushDelay,
- streamBufferMaxSize,
- blockSize, watchTimeout, checksumType,
- bytesPerChecksum, multipartUploadID, multipartNumber, isMultipartKey,
- maxRetryCount, retryInterval, unsafeByteBufferConversion);
+ return new KeyOutputStream(
+ clientConfig,
+ openHandler,
+ xceiverManager,
+ omClient,
+ chunkSize,
+ requestID,
+ factor,
+ type,
+ multipartUploadID,
+ multipartNumber,
+ isMultipartKey,
+ unsafeByteBufferConversion);
}
}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index c61d0eb..8c0ed41 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -42,8 +42,8 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.StorageType;
-import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -104,7 +104,6 @@ import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
import org.apache.hadoop.ozone.security.acl.OzoneAclConfig;
import org.apache.hadoop.ozone.security.acl.OzoneObj;
-import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@@ -130,31 +129,24 @@ public class RpcClient implements ClientProtocol {
private final OzoneManagerProtocol ozoneManagerClient;
private final XceiverClientManager xceiverClientManager;
private final int chunkSize;
- private final ChecksumType checksumType;
- private final int bytesPerChecksum;
- private final boolean unsafeByteBufferConversion;
- private boolean verifyChecksum;
private final UserGroupInformation ugi;
private final ACLType userRights;
private final ACLType groupRights;
- private final int streamBufferSize;
- private final long streamBufferFlushSize;
- private boolean streamBufferFlushDelay;
- private final long streamBufferMaxSize;
private final long blockSize;
private final ClientId clientId = ClientId.randomId();
- private final int maxRetryCount;
- private final long retryInterval;
+ private final boolean unsafeByteBufferConversion;
private Text dtService;
private final boolean topologyAwareReadEnabled;
private final boolean checkKeyNameEnabled;
+ private final OzoneClientConfig clientConfig;
/**
- * Creates RpcClient instance with the given configuration.
- * @param conf Configuration
- * @param omServiceId OM HA Service ID, set this to null if not HA
- * @throws IOException
- */
+ * Creates RpcClient instance with the given configuration.
+ *
+ * @param conf Configuration
+ * @param omServiceId OM HA Service ID, set this to null if not HA
+ * @throws IOException
+ */
public RpcClient(ConfigurationSource conf, String omServiceId)
throws IOException {
Preconditions.checkNotNull(conf);
@@ -165,6 +157,8 @@ public class RpcClient implements ClientProtocol {
this.userRights = aclConfig.getUserDefaultRights();
this.groupRights = aclConfig.getGroupDefaultRights();
+ this.clientConfig = conf.getObject(OzoneClientConfig.class);
+
OmTransport omTransport = OmTransportFactory.create(conf, ugi, omServiceId);
this.ozoneManagerClient = TracingUtil.createProxy(
@@ -194,57 +188,14 @@ public class RpcClient implements ClientProtocol {
} else {
chunkSize = configuredChunkSize;
}
- streamBufferSize = (int) conf
- .getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_SIZE,
- OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_SIZE_DEFAULT,
- StorageUnit.BYTES);
- streamBufferFlushSize = (long) conf
- .getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
- OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE_DEFAULT,
- StorageUnit.BYTES);
- streamBufferFlushDelay = conf.getBoolean(
- OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY,
- OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY_DEFAULT);
- streamBufferMaxSize = (long) conf
- .getStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
- OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT,
- StorageUnit.BYTES);
+
blockSize = (long) conf.getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
unsafeByteBufferConversion = conf.getBoolean(
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED,
OzoneConfigKeys.OZONE_UNSAFEBYTEOPERATIONS_ENABLED_DEFAULT);
-
- int configuredChecksumSize = (int) conf.getStorageSize(
- OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
- OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_DEFAULT,
- StorageUnit.BYTES);
- if(configuredChecksumSize <
- OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE) {
- LOG.warn("The checksum size ({}) is not allowed to be less than the " +
- "minimum size ({}), resetting to the minimum size.",
- configuredChecksumSize,
- OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE);
- bytesPerChecksum =
- OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM_MIN_SIZE;
- } else {
- bytesPerChecksum = configuredChecksumSize;
- }
- String checksumTypeStr = conf.get(
- OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE,
- OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE_DEFAULT);
- checksumType = ChecksumType.valueOf(checksumTypeStr);
- this.verifyChecksum =
- conf.getBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM,
- OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM_DEFAULT);
- maxRetryCount =
- conf.getInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, OzoneConfigKeys.
- OZONE_CLIENT_MAX_RETRIES_DEFAULT);
- retryInterval = OzoneUtils.getTimeDurationInMS(conf,
- OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL,
- OzoneConfigKeys.OZONE_CLIENT_RETRY_INTERVAL_DEFAULT);
topologyAwareReadEnabled = conf.getBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
@@ -715,7 +666,7 @@ public class RpcClient implements ClientProtocol {
throws IOException {
verifyVolumeName(volumeName);
verifyBucketName(bucketName);
- if(checkKeyNameEnabled) {
+ if (clientConfig.isStreamBufferFlushDelay()) {
HddsClientUtils.verifyKeyName(keyName);
}
HddsClientUtils.checkNotNull(keyName, type, factor);
@@ -963,21 +914,14 @@ public class RpcClient implements ClientProtocol {
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
- .setChunkSize(chunkSize)
.setRequestID(requestId)
.setType(openKey.getKeyInfo().getType())
.setFactor(openKey.getKeyInfo().getFactor())
- .setStreamBufferSize(streamBufferSize)
- .setStreamBufferFlushSize(streamBufferFlushSize)
- .setStreamBufferMaxSize(streamBufferMaxSize)
- .setBlockSize(blockSize)
- .setBytesPerChecksum(bytesPerChecksum)
- .setChecksumType(checksumType)
.setMultipartNumber(partNumber)
.setMultipartUploadID(uploadID)
.setIsMultipartKey(true)
- .setMaxRetryCount(maxRetryCount)
- .setRetryInterval(retryInterval)
+ .enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
+ .setConfig(clientConfig)
.build();
keyOutputStream.addPreallocateBlocks(
openKey.getKeyInfo().getLatestVersionLocations(),
@@ -1234,7 +1178,7 @@ public class RpcClient implements ClientProtocol {
throws IOException {
LengthInputStream lengthInputStream = KeyInputStream
.getFromOmKeyInfo(keyInfo, xceiverClientManager,
- verifyChecksum, retryFunction);
+ clientConfig.isChecksumVerify(), retryFunction);
FileEncryptionInfo feInfo = keyInfo.getFileEncryptionInfo();
if (feInfo != null) {
final KeyProvider.KeyVersion decrypted = getDEK(feInfo);
@@ -1271,20 +1215,11 @@ public class RpcClient implements ClientProtocol {
.setHandler(openKey)
.setXceiverClientManager(xceiverClientManager)
.setOmClient(ozoneManagerClient)
- .setChunkSize(chunkSize)
.setRequestID(requestId)
.setType(HddsProtos.ReplicationType.valueOf(type.toString()))
.setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
- .setStreamBufferSize(streamBufferSize)
- .setStreamBufferFlushSize(streamBufferFlushSize)
- .setStreamBufferFlushDelay(streamBufferFlushDelay)
- .setStreamBufferMaxSize(streamBufferMaxSize)
- .setBlockSize(blockSize)
- .setChecksumType(checksumType)
- .setBytesPerChecksum(bytesPerChecksum)
- .setMaxRetryCount(maxRetryCount)
- .setRetryInterval(retryInterval)
.enableUnsafeByteBufferConversion(unsafeByteBufferConversion)
+ .setConfig(clientConfig)
.build();
keyOutputStream
.addPreallocateBlocks(openKey.getKeyInfo().getLatestVersionLocations(),
diff --git a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
index 3267976..f0dfba8 100644
--- a/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
+++ b/hadoop-ozone/fault-injection-test/mini-chaos-tests/src/test/java/org/apache/hadoop/ozone/MiniOzoneChaosCluster.java
@@ -18,20 +18,22 @@
package org.apache.hadoop.ozone;
+import java.io.IOException;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
+import java.util.Collections;
import java.util.HashSet;
-import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
-import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
@@ -42,12 +44,11 @@ import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils;
+
+import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.concurrent.TimeUnit;
-
/**
* This class causes random failures in the chaos cluster.
*/
@@ -198,16 +199,17 @@ public class MiniOzoneChaosCluster extends MiniOzoneHAClusterImpl {
protected void initializeConfiguration() throws IOException {
super.initializeConfiguration();
+
+ OzoneClientConfig clientConfig =new OzoneClientConfig();
+ clientConfig.setStreamBufferFlushSize(8 * 1024 * 1024);
+ clientConfig.setStreamBufferMaxSize(16 * 1024 * 1024);
+ clientConfig.setStreamBufferSize(4 * 1024);
+ conf.setFromObject(clientConfig);
+
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
4, StorageUnit.KB);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
32, StorageUnit.KB);
- conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
- 8, StorageUnit.KB);
- conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
- 16, StorageUnit.KB);
- conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_SIZE,
- 4, StorageUnit.KB);
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
1, StorageUnit.MB);
conf.setTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT, 1000,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 1c1fcc6..629ab5a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -17,19 +17,6 @@
*/
package org.apache.hadoop.ozone;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
-import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
-import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY;
-import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_HTTP_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
-import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR;
-
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -46,14 +33,14 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.DFSConfigKeysLegacy;
import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
@@ -79,6 +66,20 @@ import org.apache.hadoop.ozone.recon.ReconServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils;
+
+import org.apache.commons.io.FileUtils;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
+import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_ADDRESS_KEY;
+import static org.apache.hadoop.hdds.recon.ReconConfigKeys.OZONE_RECON_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_DB_DIR;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_HTTP_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_OM_SNAPSHOT_DB_DIR;
+import static org.apache.hadoop.ozone.recon.ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR;
import org.hadoop.ozone.recon.codegen.ReconSqlDbConfig;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
@@ -596,14 +597,20 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
if (!streamBufferSizeUnit.isPresent()) {
streamBufferSizeUnit = Optional.of(StorageUnit.MB);
}
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamBufferSize(
+ (int) Math.round(
+ streamBufferSizeUnit.get().toBytes(streamBufferSize.getAsInt())));
+ clientConfig.setStreamBufferMaxSize(Math.round(
+ streamBufferSizeUnit.get().toBytes(streamBufferMaxSize.get())));
+ clientConfig.setStreamBufferFlushSize(Math.round(
+ streamBufferSizeUnit.get().toBytes(streamBufferFlushSize.get())));
+ conf.setFromObject(clientConfig);
+
conf.setStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
chunkSize.get(), streamBufferSizeUnit.get());
- conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_SIZE,
- streamBufferSize.getAsInt(), streamBufferSizeUnit.get());
- conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_SIZE,
- streamBufferFlushSize.get(), streamBufferSizeUnit.get());
- conf.setStorageSize(OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE,
- streamBufferMaxSize.get(), streamBufferSizeUnit.get());
+
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(),
streamBufferSizeUnit.get());
// MiniOzoneCluster should have global pipeline upper limit.
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 44b47dc..639a64d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -17,10 +17,17 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
@@ -33,22 +40,16 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.Timeout;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
/**
* Tests BlockOutputStream class.
@@ -85,13 +86,20 @@ public class TestBlockOutputStream {
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
+ OzoneClientConfig config = new OzoneClientConfig();
+ config.setChecksumType(ChecksumType.NONE);
+ conf.setFromObject(config);
+
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
- conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
- conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
+
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
.setTotalPipelineNumLimit(10)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
index 93a3ad6..ab12cd9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamFlushDelay.java
@@ -17,6 +17,11 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -33,21 +38,16 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.Timeout;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
* Tests TestBlockOutputStreamFlushDelay class.
@@ -84,9 +84,9 @@ public class TestBlockOutputStreamFlushDelay {
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
+
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
- conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 4d7a6db..8463c1d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -16,12 +16,21 @@
*/
package org.apache.hadoop.ozone.client.rpc;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -37,27 +46,19 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.time.Duration;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.Timeout;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
/**
* Tests failure detection and handling in BlockOutputStream Class.
@@ -95,10 +96,14 @@ public class TestBlockOutputStreamWithFailures {
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
+
+ OzoneClientConfig config = new OzoneClientConfig();
+ config.setChecksumType(ChecksumType.NONE);
+ conf.setFromObject(config);
+
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 10, TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 10, TimeUnit.SECONDS);
- conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
@@ -115,7 +120,9 @@ public class TestBlockOutputStreamWithFailures {
raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(3));
conf.setFromObject(raftClientConfig);
- conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
RatisClientConfig ratisClientConfig =
conf.getObject(RatisClientConfig.class);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
index f1ed5b1..21e43e6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailuresFlushDelay.java
@@ -16,13 +16,21 @@
*/
package org.apache.hadoop.ozone.client.rpc;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.time.Duration;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -38,20 +46,20 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
-import org.junit.*;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.Timeout;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.time.Duration;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
-
/**
* Tests failure detection by set flush delay and handling in
* BlockOutputStream Class.
@@ -89,10 +97,14 @@ public class TestBlockOutputStreamWithFailuresFlushDelay {
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
+
+ OzoneClientConfig config = new OzoneClientConfig();
+ config.setChecksumType(ChecksumType.NONE);
+ conf.setFromObject(config);
+
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 10, TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 10, TimeUnit.SECONDS);
- conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index 4bac4fe..12ba4e6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -17,18 +17,26 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
-import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneBucket;
-import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneVolume;
import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneInputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
@@ -37,23 +45,16 @@ import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.Timeout;
-import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Tests Close Container Exception handling by Ozone Client.
@@ -87,9 +88,13 @@ public class TestCloseContainerHandlingByClient {
public static void init() throws Exception {
chunkSize = (int) OzoneConsts.MB;
blockSize = 4 * chunkSize;
+
+ OzoneClientConfig config = new OzoneClientConfig();
+ config.setChecksumType(ChecksumType.NONE);
+ conf.setFromObject(config);
+
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
- conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index abef9f2..a658cf4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -30,8 +30,10 @@ import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
@@ -125,7 +127,10 @@ public class TestCommitWatcher {
ratisClientConfig.setWatchRequestTimeout(Duration.ofSeconds(10));
conf.setFromObject(ratisClientConfig);
- conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
+ OzoneClientConfig clientConfig = new OzoneClientConfig();
+ clientConfig.setChecksumType(ChecksumType.NONE);
+ conf.setFromObject(clientConfig);
+
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 6431b70..3dfddfe 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -17,11 +17,19 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
@@ -36,26 +44,19 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.RatisServ
import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.Timeout;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
/**
* Tests the containerStateMachine failure handling.
@@ -99,7 +100,11 @@ public class TestContainerStateMachine {
conf.setQuietMode(false);
OzoneManager.setTestSecureOmFlag(true);
conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
- conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
+
// conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
cluster =
MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 82836a3..34f6964 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -17,6 +17,19 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
@@ -25,6 +38,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
@@ -44,49 +58,28 @@ import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
-import org.apache.hadoop.ozone.container.common.transport.server.ratis.
- ContainerStateMachine;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerDataProto.State.UNHEALTHY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
-
+import static org.hamcrest.core.Is.is;
import org.junit.AfterClass;
import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.hadoop.hdds.HddsConfigKeys.
- HDDS_COMMAND_STATUS_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys.
- HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys
- .HDDS_PIPELINE_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
- ContainerDataProto.State.UNHEALTHY;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
- OZONE_SCM_STALENODE_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
- OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
-import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
+import org.junit.BeforeClass;
+import org.junit.Test;
/**
* Tests the containerStateMachine failure handling.
@@ -110,8 +103,11 @@ public class TestContainerStateMachineFailures {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
- conf.setBoolean(OzoneConfigKeys.
- OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
+
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDiscardPreallocatedBlocks.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDiscardPreallocatedBlocks.java
index 37e13b6..061c5e1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDiscardPreallocatedBlocks.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDiscardPreallocatedBlocks.java
@@ -17,10 +17,18 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -31,27 +39,22 @@ import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.io.*;
+import org.apache.hadoop.ozone.client.io.BlockOutputStreamEntry;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.Timeout;
-import static java.nio.charset.StandardCharsets.UTF_8;
/**
* Tests Close Container Exception handling by Ozone Client.
@@ -86,9 +89,13 @@ public class TestDiscardPreallocatedBlocks{
public static void init() throws Exception {
chunkSize = (int) OzoneConsts.MB;
blockSize = 4 * chunkSize;
+
+ OzoneClientConfig config = new OzoneClientConfig();
+ config.setChecksumType(ChecksumType.NONE);
+ conf.setFromObject(config);
+
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
- conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index a9c0706..b44427b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -122,8 +123,10 @@ public class TestFailureHandlingByClient {
raftClientConfig.setRpcWatchRequestTimeout(Duration.ofSeconds(3));
conf.setFromObject(raftClientConfig);
- conf.setBoolean(
- OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
+
conf.setQuietMode(false);
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index 7775bb7..4dbb0b6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -17,10 +17,18 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
@@ -34,26 +42,19 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Test;
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-
/**
* Tests {@link KeyInputStream}.
*/
@@ -96,14 +97,18 @@ public class TestKeyInputStream {
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
+
+ OzoneClientConfig config = new OzoneClientConfig();
+ config.setBytesPerChecksum(256 * 1024 * 1024);
+ conf.setFromObject(config);
+
+
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 64,
StorageUnit.MB);
conf.set(ScmConfigKeys.OZONE_SCM_CHUNK_LAYOUT_KEY, chunkLayout.name());
- conf.setStorageSize(
- OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM, 256, StorageUnit.KB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
.setTotalPipelineNumLimit(5)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index dd871f3..d885d38 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -16,22 +16,29 @@
*/
package org.apache.hadoop.ozone.client.rpc;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.
- ContainerNotOpenException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerNotOpenException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
@@ -40,24 +47,16 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.Timeout;
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY;
/**
* Tests failure detection and handling in BlockOutputStream Class.
@@ -96,12 +95,20 @@ public class TestOzoneClientRetriesOnException {
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
+
+ OzoneClientConfig config = new OzoneClientConfig();
+ config.setMaxRetryCount(3);
+ config.setChecksumType(ChecksumType.NONE);
+ conf.setFromObject(config);
+
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
- conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
- conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3);
conf.setQuietMode(false);
- conf.setBoolean(OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
+
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
.setTotalPipelineNumLimit(10)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
index e202ca1..14bce99 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnExceptionFlushDelay.java
@@ -16,10 +16,17 @@
*/
package org.apache.hadoop.ozone.client.rpc;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientSpi;
@@ -29,7 +36,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
@@ -37,17 +43,16 @@ import org.apache.hadoop.ozone.client.io.KeyOutputStream;
import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
-import org.apache.ratis.protocol.exceptions.GroupMismatchException;
-import org.junit.*;
-import org.junit.rules.Timeout;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import org.apache.ratis.protocol.exceptions.GroupMismatchException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
/**
* Tests failure detection and handling in BlockOutputStream Class by set
@@ -89,8 +94,12 @@ public class TestOzoneClientRetriesOnExceptionFlushDelay {
blockSize = 2 * maxFlushSize;
conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000,
TimeUnit.MILLISECONDS);
- conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
- conf.setInt(OzoneConfigKeys.OZONE_CLIENT_MAX_RETRIES, 3);
+
+ OzoneClientConfig config = new OzoneClientConfig();
+ config.setChecksumType(ChecksumType.NONE);
+ config.setMaxRetryCount(3);
+ conf.setFromObject(config);
+
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 3);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
index 87c19ae..17cc0ce 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClient.java
@@ -18,12 +18,14 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.IOException;
+
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+
import org.junit.AfterClass;
import org.junit.BeforeClass;
-
-import java.io.IOException;
import org.junit.Rule;
import org.junit.rules.Timeout;
@@ -49,6 +51,7 @@ public class TestOzoneRpcClient extends TestOzoneRpcClientAbstract {
@BeforeClass
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setFromObject(new OzoneClientConfig());
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
startCluster(conf);
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index b7b75a4..d04e976 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.StorageType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -52,7 +53,6 @@ import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolCli
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneAcl;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.OzoneTestUtils;
import org.apache.hadoop.ozone.client.BucketArgs;
@@ -106,7 +106,6 @@ import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
-
import static org.apache.hadoop.hdds.StringUtils.string2Bytes;
import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
import static org.apache.hadoop.hdds.client.ReplicationFactor.THREE;
@@ -116,13 +115,12 @@ import static org.apache.hadoop.ozone.OzoneAcl.AclScope.DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConsts.GB;
-import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.NO_SUCH_MULTIPART_UPLOAD_ERROR;
import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.PARTIAL_RENAME;
import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.GROUP;
import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType.USER;
import static org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType.READ;
-
import org.junit.Assert;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -130,7 +128,6 @@ import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import org.junit.Test;
/**
@@ -1466,9 +1463,14 @@ public abstract class TestOzoneRpcClientAbstract {
private void readCorruptedKey(String volumeName, String bucketName,
String keyName, boolean verifyChecksum) {
try {
+
OzoneConfiguration configuration = cluster.getConf();
- configuration.setBoolean(OzoneConfigKeys.OZONE_CLIENT_VERIFY_CHECKSUM,
- verifyChecksum);
+
+ final OzoneClientConfig clientConfig =
+ configuration.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumVerify(verifyChecksum);
+ configuration.setFromObject(clientConfig);
+
RpcClient client = new RpcClient(configuration, null);
OzoneInputStream is = client.getKey(volumeName, bucketName, keyName);
is.read(new byte[100]);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
index 10400b3..ac84f17 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientWithRatis.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.ObjectStore;
@@ -62,6 +63,7 @@ public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
+ conf.setFromObject(new OzoneClientConfig());
conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, 1);
conf.setBoolean(ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE,
false);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java
index 84416d2..483823c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestValidateBCSIDOnRestart.java
@@ -17,6 +17,14 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
@@ -25,10 +33,10 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
@@ -45,6 +53,11 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.BlockUtils;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.junit.AfterClass;
@@ -52,25 +65,6 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.hdds.HddsConfigKeys.
- HDDS_COMMAND_STATUS_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys.
- HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys
- .HDDS_PIPELINE_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
- OZONE_SCM_STALENODE_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
- OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
-
/**
* Tests the containerStateMachine failure handling.
*/
@@ -92,8 +86,11 @@ public class TestValidateBCSIDOnRestart {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
- conf.setBoolean(OzoneConfigKeys.
- OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
+
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index c92db45..9058d34 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -17,6 +17,17 @@
package org.apache.hadoop.ozone.client.rpc;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.DatanodeRatisServerConfig;
@@ -24,7 +35,11 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.ratis.conf.RatisClientConfig;
-import org.apache.hadoop.hdds.scm.*;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.XceiverClientRatis;
+import org.apache.hadoop.hdds.scm.XceiverClientReply;
+import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -32,7 +47,6 @@ import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolCli
import org.apache.hadoop.hdds.scm.storage.BlockOutputStream;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.client.ObjectStore;
import org.apache.hadoop.ozone.client.OzoneClient;
@@ -42,27 +56,16 @@ import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
import org.apache.hadoop.ozone.container.ContainerTestHelper;
import org.apache.hadoop.ozone.container.TestHelper;
import org.apache.hadoop.test.GenericTestUtils;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import org.apache.ratis.protocol.exceptions.GroupMismatchException;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.After;
import org.junit.Test;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
-
/**
* This class verifies the watchForCommit Handling by xceiverClient.
*/
@@ -96,8 +99,11 @@ public class TestWatchForCommit {
flushSize = 2 * chunkSize;
maxFlushSize = 2 * flushSize;
blockSize = 2 * maxFlushSize;
- conf.setBoolean(
- OzoneConfigKeys.OZONE_CLIENT_STREAM_BUFFER_FLUSH_DELAY, false);
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
+
conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
TimeUnit.SECONDS);
conf.setQuietMode(false);
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeBlockPutter.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeBlockPutter.java
index 0bdb4a3..620fd85 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeBlockPutter.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeBlockPutter.java
@@ -19,12 +19,12 @@ package org.apache.hadoop.ozone.freon;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Callable;
-import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.BlockData;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumData;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChecksumType;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ChunkInfo;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.PutBlockRequestProto;
@@ -34,9 +34,10 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.apache.hadoop.ozone.common.Checksum;
import com.codahale.metrics.Timer;
-import org.apache.hadoop.ozone.common.Checksum;
+import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;
@@ -105,7 +106,7 @@ public class DatanodeBlockPutter extends BaseFreonGenerator implements
byte[] data = RandomStringUtils.randomAscii(chunkSize)
.getBytes(StandardCharsets.UTF_8);
- Checksum checksum = new Checksum();
+ Checksum checksum = new Checksum(ChecksumType.CRC32, 1024 * 1024);
checksumProtobuf = checksum.computeChecksum(data).getProtoBufMessage();
runTests(this::putBlock);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org