You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2022/03/25 11:36:35 UTC

[ozone] 01/01: Merge remote-tracking branch 'origin/master' into HDDS-3816-ec-merge-master

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 9ec90f5820aa9143393f5aa392ed9c1f234e9c2c
Merge: f386662 75f5501
Author: Doroszlai, Attila <ad...@apache.org>
AuthorDate: Thu Mar 24 19:19:08 2022 +0100

    Merge remote-tracking branch 'origin/master' into HDDS-3816-ec-merge-master

 .github/workflows/post-commit.yml                  |  12 +-
 dev-support/annotations/pom.xml                    | 114 +++++
 .../RequestFeatureValidatorProcessor.java          | 289 ++++++++++++
 .../org/apache/ozone/annotations/package-info.java |   5 +
 .../services/javax.annotation.processing.Processor |  19 +-
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   2 +-
 .../storage/DummyBlockInputStreamWithRetry.java    |   2 +-
 hadoop-hdds/common/pom.xml                         |  13 +-
 ...DatanodeVersions.java => ComponentVersion.java} |  24 +-
 .../org/apache/hadoop/hdds/DatanodeVersion.java    |  71 +++
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |   1 -
 .../hadoop/hdds/client/RatisReplicationConfig.java |  34 +-
 .../hadoop/hdds/client/ReplicationConfig.java      |   4 +-
 .../hdds/client/StandaloneReplicationConfig.java   |  34 +-
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  13 +-
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |  66 ++-
 .../RequestTypeDependentRetryPolicyCreator.java    |   6 +-
 .../hadoop/hdds/scm/container/ContainerInfo.java   |  21 +-
 .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java  |   9 +-
 .../apache/hadoop/hdds/scm/net/InnerNodeImpl.java  |   2 +-
 .../protocol/StorageContainerLocationProtocol.java |  11 +-
 .../org/apache/hadoop/ozone/ClientVersion.java     |  75 +++
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |  32 +-
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   4 +
 .../apache/hadoop/ozone/OzoneManagerVersion.java   |  72 +++
 .../common/src/main/resources/ozone-default.xml    |  35 +-
 .../hdds/TestComponentVersionInvariants.java       |  98 ++++
 .../client/TestReplicationConfigValidator.java     |  16 +-
 .../hadoop/hdds/protocol/TestDatanodeDetails.java  |  11 +-
 .../hdds/scm/container/TestContainerInfo.java      |   2 +-
 .../hadoop/hdds/scm/pipeline/MockPipeline.java     |   4 +-
 .../hadoop/hdds/scm/pipeline/TestPipeline.java     |   9 +-
 .../ozone/container/ContainerTestHelper.java       |  68 ---
 .../TestDefaultUpgradeFinalizationExecutor.java    |   3 +-
 .../apache/hadoop/ozone/HddsDatanodeService.java   |   8 +-
 .../ozone/container/common/impl/ContainerData.java |  46 +-
 .../ozone/container/common/impl/ContainerSet.java  |  16 +-
 .../common/interfaces/ContainerInspector.java      |  72 +++
 .../ozone/container/common/interfaces/Handler.java |  12 +-
 .../common/report/ContainerReportPublisher.java    |   5 +-
 .../common/report/IncrementalReportSender.java     |  30 ++
 .../common/statemachine/DatanodeStateMachine.java  |   6 +-
 .../common/statemachine/StateContext.java          | 131 +++---
 .../CreatePipelineCommandHandler.java              |  13 +-
 .../states/datanode/RunningDatanodeState.java      |   8 +-
 .../states/endpoint/HeartbeatEndpointTask.java     |  23 +-
 .../server/ratis/ContainerStateMachine.java        |   7 +-
 .../transport/server/ratis/XceiverServerRatis.java |  18 +-
 .../common/utils/ContainerInspectorUtil.java       |  87 ++++
 .../container/common/volume/AbstractFuture.java    |  13 +-
 .../container/keyvalue/KeyValueContainer.java      |  76 ++-
 .../container/keyvalue/KeyValueContainerData.java  |  10 +-
 .../KeyValueContainerMetadataInspector.java        | 463 ++++++++++++++++++
 .../ozone/container/keyvalue/KeyValueHandler.java  |  95 ++--
 .../container/keyvalue/TarContainerPacker.java     |  55 ++-
 .../container/keyvalue/helpers/ChunkUtils.java     |   4 +
 .../keyvalue/helpers/KeyValueContainerUtil.java    |  23 +-
 .../container/keyvalue/impl/BlockManagerImpl.java  | 104 ++--
 .../keyvalue/impl/ChunkManagerDispatcher.java      |  18 +-
 .../background/BlockDeletingService.java           | 108 +++--
 .../container/metadata/AbstractDatanodeStore.java  |   6 +-
 .../ozone/container/ozoneimpl/ContainerReader.java |   6 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  40 +-
 .../container/replication/MeasuredReplicator.java  |   3 +-
 .../commands/RefreshVolumeUsageCommand.java        |  57 +++
 .../hadoop/ozone/container/common/ScmTestMock.java |   7 +-
 .../container/common/TestBlockDeletingService.java |   6 +-
 .../common/TestKeyValueContainerData.java          |   6 +-
 .../TestSchemaOneBackwardsCompatibility.java       |   2 +-
 .../common/helpers/TestDatanodeVersionFile.java    |   4 +-
 .../impl/TestContainerDeletionChoosingPolicy.java  |   2 +-
 .../common/impl/TestContainerPersistence.java      |  47 --
 .../container/common/impl/TestHddsDispatcher.java  |   7 +-
 .../common/statemachine/TestStateContext.java      |  71 ++-
 .../TestCreatePipelineCommandHandler.java          |  36 +-
 .../common/volume/TestStorageVolumeChecker.java    |  12 +-
 .../keyvalue/TestKeyValueBlockIterator.java        |  10 +-
 .../container/keyvalue/TestKeyValueContainer.java  |  67 ++-
 .../keyvalue/TestKeyValueContainerCheck.java       | 158 +------
 ...a => TestKeyValueContainerIntegrityChecks.java} | 160 ++-----
 .../TestKeyValueContainerMetadataInspector.java    | 360 ++++++++++++++
 .../container/keyvalue/TestKeyValueHandler.java    |  19 +-
 .../TestKeyValueHandlerWithUnhealthyContainer.java |  14 -
 .../keyvalue/impl/CommonChunkManagerTestCases.java |   1 -
 .../keyvalue/impl/TestBlockManagerImpl.java        |  57 +--
 .../container/ozoneimpl/TestContainerReader.java   |   2 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |   6 +-
 .../replication/TestMeasuredReplicator.java        |  15 +
 .../testutils/BlockDeletingServiceTestImpl.java    |   2 +-
 hadoop-hdds/dev-support/checkstyle/checkstyle.xml  |   1 +
 hadoop-hdds/docs/content/concept/Containers.md     |   3 +-
 hadoop-hdds/docs/content/concept/Datanodes.md      |   4 +-
 hadoop-hdds/docs/content/concept/OzoneManager.md   |   6 +-
 hadoop-hdds/docs/content/concept/Recon.md          |   9 +-
 hadoop-hdds/docs/content/feature/OM-HA.md          |   4 +-
 hadoop-hdds/docs/content/feature/PrefixFSO.md      |  68 ++-
 hadoop-hdds/docs/content/feature/SCM-HA.md         |   2 +-
 hadoop-hdds/docs/content/interface/O3fs.md         |   2 +-
 hadoop-hdds/docs/content/interface/O3fs.zh.md      |   4 +-
 hadoop-hdds/docs/content/interface/Ofs.md          |   2 +-
 hadoop-hdds/docs/content/tools/TestTools.md        |   4 +-
 hadoop-hdds/docs/content/tools/TestTools.zh.md     |   4 +-
 hadoop-hdds/docs/dev-support/bin/generate-site.sh  |  12 +-
 .../docs/dev-support/bin/make_images_responsive.py |  57 +++
 .../themes/ozonedoc/layouts/shortcodes/image.html  |   2 +-
 hadoop-hdds/framework/pom.xml                      |   8 +
 .../hadoop/hdds/protocol/SCMSecurityProtocol.java  |  12 +
 .../SCMSecurityProtocolClientSideTranslatorPB.java |  37 ++
 ...lockLocationProtocolClientSideTranslatorPB.java |   4 +-
 ...inerLocationProtocolClientSideTranslatorPB.java |   9 +-
 .../scm/update/client/UpdateServiceConfig.java     |   5 +-
 .../authority/PKIProfiles/DefaultProfile.java      |  32 +-
 ...ateClient.java => CommonCertificateClient.java} | 116 ++---
 .../certificate/client/OMCertificateClient.java    |  79 +---
 .../client/ReconCertificateClient.java}            |  38 +-
 ...va => FixedThreadPoolWithAffinityExecutor.java} |  70 ++-
 .../hadoop/hdds/utils/db/DBUpdatesWrapper.java     |   9 +
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |   1 +
 .../hadoop/hdds/utils/db/cache/EpochEntry.java     |  75 ---
 .../hadoop/hdds/utils/db/cache/FullTableCache.java |  50 +-
 .../hdds/utils/db/cache/PartialTableCache.java     |  56 +--
 .../hadoop/hdds/utils/db/cache/TableCache.java     |   4 +-
 .../client/TestDefaultCertificateClient.java       |   2 +-
 .../hadoop/hdds/server/events/TestEventQueue.java  |  35 +-
 .../hadoop/hdds/server/http/TestHtmlQuoting.java   |   5 +-
 .../hadoop/hdds/utils/db/cache/TestTableCache.java |  77 ++-
 hadoop-hdds/interface-client/pom.xml               |   5 +
 .../src/main/proto/DatanodeClientProtocol.proto    |   4 +-
 .../interface-client/src/main/proto/hdds.proto     |   7 +
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |  11 +-
 .../src/main/proto/ScmServerSecurityProtocol.proto |   8 +-
 .../hadoop/hdds/scm/SCMCommonPlacementPolicy.java  |   2 +-
 .../container/AbstractContainerReportHandler.java  |  43 +-
 .../hdds/scm/container/ContainerManager.java       |   7 +
 .../hdds/scm/container/ContainerManagerImpl.java   |  78 +--
 .../hdds/scm/container/ContainerReportHandler.java | 141 ++++--
 .../hdds/scm/container/ContainerStateManager.java  |  10 +-
 .../scm/container/ContainerStateManagerImpl.java   |  24 +-
 .../IncrementalContainerReportHandler.java         |  18 +-
 .../hdds/scm/container/ReplicationManager.java     |   4 +-
 .../scm/container/balancer/ContainerBalancer.java  | 133 ++++--
 .../balancer/ContainerBalancerConfiguration.java   |  23 +-
 .../balancer/ContainerBalancerMetrics.java         | 139 +++---
 .../scm/container/states/ContainerStateMap.java    |  27 +-
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |   2 +-
 .../apache/hadoop/hdds/scm/ha/HASecurityUtils.java |  37 +-
 .../hadoop/hdds/scm/ha/InterSCMGrpcClient.java     |   4 +-
 .../org/apache/hadoop/hdds/scm/ha/RatisUtil.java   |   8 +-
 .../hdds/scm/ha/SCMDBCheckpointProvider.java       |   2 +-
 ...ffer.java => SCMHADBTransactionBufferStub.java} |   8 +-
 ...MockSCMHAManager.java => SCMHAManagerStub.java} |  24 +-
 .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java     |   4 +-
 .../hadoop/hdds/scm/ha/SCMServiceManager.java      |   3 +-
 .../hdds/scm/metadata/MoveDataNodePairCodec.java   |   6 +-
 .../hadoop/hdds/scm/metadata/PipelineCodec.java    |   6 +-
 .../hdds/scm/node/DatanodeAdminMonitorImpl.java    |   9 +-
 .../hadoop/hdds/scm/node/DatanodeUsageInfo.java    |  15 +-
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  18 +
 .../hadoop/hdds/scm/node/NodeStateManager.java     |  18 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  34 +-
 .../hdds/scm/node/states/Node2ObjectsMap.java      |   2 +-
 .../hadoop/hdds/scm/node/states/NodeStateMap.java  |   4 +-
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |   4 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |  54 +--
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java |  11 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |   6 +-
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |   3 +-
 .../SCMSecurityProtocolServerSideTranslatorPB.java |  28 ++
 ...inerLocationProtocolServerSideTranslatorPB.java |   9 +-
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |   2 +-
 .../safemode/OneReplicaPipelineSafeModeRule.java   |   2 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  12 +-
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |  20 +
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |  12 +-
 .../hadoop/hdds/scm/server/SCMPolicyProvider.java  |   5 +-
 .../hdds/scm/server/SCMSecurityProtocolServer.java |  16 +-
 .../hadoop/hdds/scm/server/SCMStorageConfig.java   |   2 +-
 .../hdds/scm/server/StorageContainerManager.java   |  51 +-
 .../org/apache/hadoop/hdds/scm/HddsTestUtils.java  |  14 +-
 .../hadoop/hdds/scm/TestHddsServerUtils.java       |   2 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java    |   7 +-
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java |   8 +-
 .../hadoop/hdds/scm/container/MockNodeManager.java |  21 +
 .../hdds/scm/container/SimpleMockNodeManager.java  |  20 +-
 .../container/TestCloseContainerEventHandler.java  |   8 +-
 .../scm/container/TestContainerManagerImpl.java    |  26 +-
 .../scm/container/TestContainerReportHandler.java  |  34 +-
 .../scm/container/TestContainerStateManager.java   |  14 +-
 .../TestIncrementalContainerReportHandler.java     |  30 +-
 .../hdds/scm/container/TestReplicationManager.java | 140 +++---
 .../scm/container/TestUnknownContainerReport.java  |   4 +-
 .../container/balancer/TestContainerBalancer.java  |  84 ++--
 .../states/TestContainerReplicaCount.java          |   5 +-
 .../hdds/scm/ha/TestReplicationAnnotation.java     |  10 +-
 .../hdds/scm/ha/TestSequenceIDGenerator.java       |   4 +-
 .../hdds/scm/node/TestContainerPlacement.java      |   6 +-
 .../hdds/scm/node/TestDatanodeAdminMonitor.java    |   7 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |   3 +-
 .../hadoop/hdds/scm/node/TestNodeStateManager.java |  24 +
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  12 +-
 .../hdds/scm/node/TestSCMNodeStorageStatMap.java   |   5 +-
 .../hdds/scm/pipeline/MockPipelineManager.java     |   4 +-
 .../TestPipelineDatanodesIntersection.java         |  13 +-
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java | 129 ++---
 .../scm/pipeline/TestPipelinePlacementPolicy.java  | 158 ++++++-
 .../scm/pipeline/TestPipelineStateManagerImpl.java |  62 +--
 .../hdds/scm/pipeline/TestPipelineStateMap.java    |  12 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    |  57 ++-
 .../scm/pipeline/TestSimplePipelineProvider.java   |  18 +-
 .../pipeline/TestWritableECContainerProvider.java  |  16 +-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |  20 +-
 .../TestOneReplicaPipelineSafeModeRule.java        |  10 +-
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |  22 +-
 .../scm/server/TestSCMBlockProtocolServer.java     |  10 +-
 .../server/TestSCMDatanodeHeartbeatDispatcher.java |  54 +--
 .../server/TestStorageContainerManagerStarter.java |   6 +-
 .../testutils/ReplicationNodeManagerMock.java      |  14 +
 hadoop-hdds/test-utils/pom.xml                     |   5 +
 .../org/apache/ozone/test/GenericTestUtils.java    |  18 +-
 .../java/org/apache/ozone/test/tag/Flaky.java}     |  36 +-
 .../main/java/org/apache/ozone/test/tag/Slow.java} |  35 +-
 .../org/apache/ozone/test/tag/package-info.java}   |  23 +-
 .../scm/cli/ContainerBalancerStartSubcommand.java  |   8 +-
 .../hdds/scm/cli/ContainerOperationClient.java     |   9 +-
 .../hadoop/hdds/scm/cli/cert/CertCommands.java     |   4 +-
 .../hdds/scm/cli/datanode/ListInfoSubcommand.java  |  38 +-
 .../hdds/scm/cli/container/TestInfoSubCommand.java |   6 +-
 .../datanode/TestContainerBalancerSubCommand.java  |   6 +-
 .../cli/pipeline/TestListPipelinesSubCommand.java  |  12 +-
 .../hadoop/ozone/client/OzoneClientFactory.java    |   5 +-
 .../client/checksum/BaseFileChecksumHelper.java    |  54 ++-
 .../checksum/ReplicatedFileChecksumHelper.java     |  44 +-
 .../client/io/BlockOutputStreamEntryPool.java      |   1 -
 .../hadoop/ozone/client/io/ECBlockInputStream.java |   7 +-
 .../client/io/MultipartCryptoKeyInputStream.java   |   4 +
 .../ozone/client/protocol/ClientProtocol.java      |   4 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  46 +-
 .../checksum/TestReplicatedFileChecksumHelper.java |  39 +-
 .../hadoop/ozone/client/rpc/RpcClientTest.java     | 279 +++++------
 .../hadoop/ozone/client/rpc/TestOzoneKMSUtil.java  |   3 +-
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   4 +
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  15 +
 .../ozone/om/ha/OMFailoverProxyProvider.java       | 130 ++---
 .../apache/hadoop/ozone/om/helpers/DBUpdates.java  |  10 +
 .../hadoop/ozone/om/helpers/OmDirectoryInfo.java   |   7 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  16 +-
 .../ozone/om/helpers/OmKeyLocationInfoGroup.java   |   3 +-
 .../hadoop/ozone/om/helpers/OmVolumeArgs.java      |   4 +-
 .../hadoop/ozone/om/helpers/ServiceInfo.java       |  25 +-
 .../ozone/om/protocolPB/Hadoop3OmTransport.java    |   3 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  21 +-
 .../apache/hadoop/ozone/protocolPB/OMPBHelper.java |  37 +-
 .../apache/hadoop/ozone/security/acl/OzoneObj.java |   3 +-
 .../apache/hadoop/ozone/web/utils/OzoneUtils.java  |  18 +-
 .../org/apache/hadoop/ozone/TestOzoneAcls.java     |  10 +-
 .../ozone/om/ha/TestOMFailoverProxyProvider.java   |   8 +-
 .../hadoop/ozone/om/helpers/TestOmBucketInfo.java  |  18 +-
 .../hadoop/ozone/om/helpers/TestOmKeyInfo.java     |  24 +-
 .../ozone/om/helpers/TestOmMultipartKeyInfo.java   |   5 +-
 .../hadoop/ozone/om/helpers/TestQuotaUtil.java     |   5 +-
 .../ozone/security/acl/TestOzoneObjInfo.java       |   5 +-
 hadoop-ozone/csi/pom.xml                           |   4 +
 hadoop-ozone/datanode/pom.xml                      |   1 +
 hadoop-ozone/dev-support/checks/_lib.sh            |   2 +-
 .../dev-support/checks/_mvn_unit_report.sh         |  24 +-
 hadoop-ozone/dev-support/checks/acceptance.sh      |   3 +-
 hadoop-ozone/dev-support/checks/integration.sh     |   2 +-
 hadoop-ozone/dev-support/checks/rat.sh             |   4 +-
 hadoop-ozone/dist/pom.xml                          |   2 +-
 .../dist/src/main/compose/ozone-topology/test.sh   |   6 -
 .../dist/src/main/compose/ozone/docker-config      |   4 +
 .../dist/src/main/compose/ozone/prometheus.yml     |   1 +
 hadoop-ozone/dist/src/main/compose/ozone/test.sh   |   4 +
 .../src/main/compose/ozonesecure/docker-config     |   4 +
 .../dist/src/main/compose/ozonesecure/test.sh      |   2 +-
 hadoop-ozone/dist/src/main/compose/testlib.sh      |  98 +++-
 .../dist/src/main/compose/upgrade/testlib.sh       |   9 +-
 .../non-rolling-upgrade/1.1.0-1.2.0/callback.sh    |   6 +
 .../dist/src/main/compose/xcompat/clients.yaml     |  18 +
 hadoop-ozone/dist/src/main/compose/xcompat/test.sh |  17 +-
 hadoop-ozone/dist/src/main/k8s/README.md           |  68 +++
 .../k8s/definitions/ozone-csi/csi-controller.yaml  |   2 +-
 .../main/k8s/examples/getting-started/Flekszible   |   2 +
 .../examples/getting-started/config-configmap.yaml |   1 +
 .../kustomization.yaml}                            |  30 +-
 .../dist/src/main/k8s/examples/minikube/Flekszible |   2 +
 .../k8s/examples/minikube/config-configmap.yaml    |   1 +
 .../Flekszible => minikube/kustomization.yaml}     |  30 +-
 .../src/main/k8s/examples/ozone-dev/Flekszible     |   1 +
 .../k8s/examples/ozone-dev/config-configmap.yaml   |   1 +
 .../examples/ozone-dev/kustomization.yaml}         |  42 +-
 .../dist/src/main/k8s/examples/ozone-ha/Flekszible |   3 +
 .../ozone-ha/{Flekszible => kustomization.yaml}    |  26 +-
 .../dist/src/main/k8s/examples/ozone/Flekszible    |   3 +-
 .../main/k8s/examples/ozone/config-configmap.yaml  |   1 +
 .../Flekszible => ozone/kustomization.yaml}        |  26 +-
 hadoop-ozone/dist/src/main/license/jar-report.txt  |   1 +
 .../main/smoketest/auditparser/auditparser.robot   |  22 +-
 .../dist/src/main/smoketest/basic/basic.robot      |   9 +-
 .../debug/ozone-debug-corrupt-block.robot          |  49 ++
 .../ozone-debug-dead-datanode.robot}               |  35 +-
 .../debug/ozone-debug-stale-datanode.robot         |  48 ++
 .../main/smoketest/debug/ozone-debug-tests.robot   |  51 ++
 .../src/main/smoketest/debug/ozone-debug.robot     |  93 +++-
 .../dist/src/main/smoketest/freon/generate.robot   |  19 +-
 .../dist/src/main/smoketest/freon/remove.robot     |  21 +-
 .../dist/src/main/smoketest/freon/validate.robot   |  13 +-
 .../dist/src/main/smoketest/omha/om-prepare.robot  |   3 +-
 .../dist/src/main/smoketest/ozone-lib/freon.robot  |  65 +++
 .../dist/src/main/smoketest/recon/recon-api.robot  |   6 +-
 .../src/main/smoketest/s3/MultipartUpload.robot    |  21 +-
 .../dist/src/main/smoketest/s3/bucketlist.robot    |   7 +-
 .../dist/src/main/smoketest/s3/commonawslib.robot  |  36 ++
 .../dist/src/main/smoketest/spnego/web.robot       |   4 -
 .../dist/src/main/smoketest/upgrade/generate.robot |   5 +-
 .../fault-injection-test/mini-chaos-tests/pom.xml  |  25 +
 .../hadoop/ozone/TestAllMiniChaosOzoneCluster.java |   2 +-
 .../hadoop/ozone/insight/TestBaseInsightPoint.java |   7 +-
 hadoop-ozone/integration-test/pom.xml              |   5 +
 .../ozone/TestDirectoryDeletingServiceWithFSO.java |  26 +-
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |   4 +-
 .../fs/ozone/TestOzoneFileSystemWithLinks.java     |  14 +-
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java |   2 +-
 .../hadoop/hdds/scm/TestRatisPipelineLeader.java   |  23 +-
 .../hdds/scm/TestSCMDbCheckpointServlet.java       |   7 +-
 .../hadoop/hdds/scm/TestSCMInstallSnapshot.java    |  19 +-
 .../apache/hadoop/hdds/scm/TestSCMSnapshot.java    |   5 +-
 .../TestContainerStateManagerIntegration.java      |  44 +-
 .../metrics/TestSCMContainerManagerMetrics.java    |   4 +-
 .../hdds/scm/pipeline/TestLeaderChoosePolicy.java  |  28 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java    |   2 +-
 .../hadoop/hdds/scm/pipeline/TestNodeFailure.java  |   2 +-
 .../hdds/scm/pipeline/TestPipelineClose.java       |  29 +-
 .../TestRatisPipelineCreateAndDestroy.java         |  31 +-
 .../hadoop/hdds/scm/pipeline/TestSCMRestart.java   |   6 +-
 .../safemode/TestSCMSafeModeWithPipelineRules.java |   7 +-
 .../hadoop/hdds/upgrade/TestHDDSUpgrade.java       |  33 +-
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   8 +-
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  20 +-
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |   2 +-
 .../org/apache/hadoop/ozone/RatisTestHelper.java   |   5 +-
 ...OutputUtil.java => StandardOutputTestBase.java} |   2 +-
 .../hadoop/ozone/TestContainerOperations.java      |  35 +-
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |  36 +-
 .../hadoop/ozone/TestMiniOzoneOMHACluster.java     |   8 +-
 .../hadoop/ozone/TestOzoneConfigurationFields.java |   6 +-
 .../hadoop/ozone/TestSecureOzoneCluster.java       |  13 +-
 .../hadoop/ozone/TestStorageContainerManager.java  |   5 +-
 .../ozone/client/TestOzoneClientFactory.java       |  75 +++
 .../apache/hadoop/ozone/client/rpc/TestBCSID.java  |   7 +-
 .../rpc/TestBlockOutputStreamWithFailures.java     |  23 +-
 ...estBlockOutputStreamWithFailuresFlushDelay.java |  23 +-
 .../rpc/TestCloseContainerHandlingByClient.java    |  14 +-
 .../client/rpc/TestContainerStateMachine.java      |   4 +-
 .../TestContainerStateMachineFailureOnRead.java    |  21 +-
 .../rpc/TestContainerStateMachineFailures.java     |  13 +-
 .../rpc/TestContainerStateMachineFlushDelay.java   |  14 +-
 .../client/rpc/TestDeleteWithSlowFollower.java     |  10 +-
 .../ozone/client/rpc/TestECKeyOutputStream.java    |   8 +-
 .../client/rpc/TestFailureHandlingByClient.java    | 144 +++++-
 .../rpc/TestFailureHandlingByClientFlushDelay.java |   8 +-
 .../rpc/TestMultiBlockWritesWithDnFailures.java    |  10 +-
 .../client/rpc/TestOzoneAtRestEncryption.java      |  28 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java       |  18 +-
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  41 +-
 .../rpc/TestOzoneRpcClientForAclAuditLog.java      |   3 +-
 .../client/rpc/TestOzoneRpcClientWithRatis.java    |  10 +-
 .../ozone/client/rpc/TestSecureOzoneRpcClient.java |  20 +-
 .../ozone/client/rpc/TestWatchForCommit.java       |  12 +-
 .../rpc/read/TestBlockInputStreamFactoryImpl.java  |   2 +-
 .../ozone/client/rpc/read/TestInputStreamBase.java |   4 +-
 .../ozone/client/rpc/read/TestKeyInputStream.java  |   2 +-
 .../ozone/container/TestContainerReplication.java  |   2 +-
 .../apache/hadoop/ozone/container/TestHelper.java  |   8 +-
 .../commandhandler/TestBlockDeletion.java          |  21 +-
 .../TestCloseContainerByPipeline.java              |  24 +-
 .../commandhandler/TestCloseContainerHandler.java  |   2 +-
 .../commandhandler/TestDeleteContainerHandler.java |   2 +-
 ...ler.java => TestRefreshVolumeUsageHandler.java} | 121 +++--
 .../transport/server/ratis/TestCSMMetrics.java     |   3 +-
 .../container/ozoneimpl/TestOzoneContainer.java    | 154 +++---
 .../ozoneimpl/TestOzoneContainerWithTLS.java       |   6 +-
 .../container/server/TestContainerServer.java      |  23 +-
 .../server/TestSecureContainerServer.java          |  10 -
 .../hadoop/ozone/freon/TestRandomKeyGenerator.java |  15 +-
 .../hadoop/ozone/fsck/TestContainerMapper.java     |  16 +-
 .../hadoop/ozone/om/TestAddRemoveOzoneManager.java |  19 +-
 .../apache/hadoop/ozone/om/TestBucketOwner.java    |  27 +-
 .../ozone/om/TestContainerReportWithKeys.java      |   7 +-
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |  36 +-
 .../hadoop/ozone/om/TestOMDbCheckpointServlet.java |   2 -
 .../apache/hadoop/ozone/om/TestObjectStore.java    |  12 +-
 .../org/apache/hadoop/ozone/om/TestOmAcls.java     |  82 +++-
 .../hadoop/ozone/om/TestOmBlockVersioning.java     |  11 +-
 .../org/apache/hadoop/ozone/om/TestOmInit.java     |   7 +-
 .../org/apache/hadoop/ozone/om/TestOmLDBCli.java   |   5 +-
 .../org/apache/hadoop/ozone/om/TestOmMetrics.java  |   4 +-
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |  25 +-
 .../ozone/om/TestOzoneManagerHAKeyDeletion.java    |   2 +-
 .../ozone/om/TestOzoneManagerHAMetadataOnly.java   |  40 +-
 .../hadoop/ozone/om/TestOzoneManagerHAWithACL.java |   2 +-
 .../ozone/om/TestOzoneManagerHAWithData.java       |  62 +--
 .../ozone/om/TestOzoneManagerHAWithFailover.java   |   4 +-
 .../ozone/om/TestOzoneManagerListVolumes.java      |   2 -
 .../hadoop/ozone/om/TestOzoneManagerPrepare.java   |  14 +-
 .../hadoop/ozone/om/TestOzoneManagerRestart.java   |  49 +-
 .../apache/hadoop/ozone/om/TestScmSafeMode.java    |   2 +-
 .../hadoop/ozone/om/TestSecureOzoneManager.java    |   9 +-
 .../hadoop/ozone/recon/TestReconAsPassiveScm.java  |   8 +-
 .../hadoop/ozone/recon/TestReconScmSnapshot.java   |   2 +-
 .../apache/hadoop/ozone/recon/TestReconTasks.java  |   2 +-
 .../ozone/recon/TestReconWithOzoneManager.java     |   7 +-
 .../ozone/recon/TestReconWithOzoneManagerHA.java   |   2 +-
 .../hadoop/ozone/scm/TestFailoverWithSCMHA.java    |   5 +-
 .../TestSCMContainerPlacementPolicyMetrics.java    |  18 +-
 .../ozone/scm/TestSCMInstallSnapshotWithHA.java    |   4 +-
 .../org/apache/hadoop/ozone/scm/TestSCMMXBean.java |   4 +-
 .../ozone/scm/TestStorageContainerManagerHA.java   |   2 +-
 .../hadoop/ozone/scm/TestXceiverClientGrpc.java    |   2 +-
 .../scm/node/TestDecommissionAndMaintenance.java   |  27 +-
 .../ozone/scm/pipeline/TestSCMPipelineMetrics.java |   2 +-
 .../hadoop/ozone/shell/TestNSSummaryAdmin.java     |   4 +-
 hadoop-ozone/interface-client/pom.xml              |   4 +
 .../src/main/proto/OmClientProtocol.proto          |   3 +-
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   9 +-
 .../hadoop/ozone/om/codec/OmKeyInfoCodec.java      |   6 +-
 .../ozone/om/codec/RepeatedOmKeyInfoCodec.java     |   6 +-
 .../hadoop/ozone/om/codec/TestOmKeyInfoCodec.java  |   4 +-
 .../om/codec/TestOmMultipartKeyInfoCodec.java      |   3 +-
 .../ozone/om/codec/TestRepeatedOmKeyInfoCodec.java |   3 +-
 hadoop-ozone/ozone-manager/pom.xml                 |   5 +
 .../org/apache/hadoop/ozone/om/BucketManager.java  |  22 -
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  | 255 ----------
 .../hadoop/ozone/om/DirectoryDeletingService.java  |   7 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |   9 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 113 +++--
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  20 +
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  29 +-
 .../hadoop/ozone/om/OpenKeyCleanupService.java     |   3 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  10 +-
 .../apache/hadoop/ozone/om/OzoneManagerUtils.java  |  22 +
 .../hadoop/ozone/om/OzonePrefixPathImpl.java       |  20 +
 .../hadoop/ozone/om/TrashOzoneFileSystem.java      |   1 +
 .../hadoop/ozone/om/codec/OMDBDefinition.java      |  10 +-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |  15 +-
 .../hadoop/ozone/om/request/OMClientRequest.java   |  15 +-
 .../om/request/bucket/OMBucketCreateRequest.java   |  12 +-
 .../om/request/bucket/OMBucketDeleteRequest.java   |   9 +-
 .../om/request/bucket/acl/OMBucketAclRequest.java  |   4 +-
 .../file/OMDirectoryCreateRequestWithFSO.java      |   5 +-
 .../ozone/om/request/file/OMFileRequest.java       |   4 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  52 +-
 .../om/request/key/OMKeyCommitRequestWithFSO.java  |  24 +-
 .../om/request/key/OMKeyDeleteRequestWithFSO.java  |   2 +-
 .../om/request/key/OMKeyRenameRequestWithFSO.java  |   2 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |  40 +-
 .../om/request/key/OMOpenKeysDeleteRequest.java    |  42 +-
 .../ozone/om/request/key/acl/OMKeyAclRequest.java  |   2 +-
 .../om/request/key/acl/OMKeyAclRequestWithFSO.java |   2 +-
 .../S3MultipartUploadCompleteRequest.java          |  64 ++-
 .../S3MultipartUploadCompleteRequestWithFSO.java   |  12 +-
 .../security/OMCancelDelegationTokenRequest.java   |  42 +-
 .../security/OMGetDelegationTokenRequest.java      |  54 ++-
 .../security/OMRenewDelegationTokenRequest.java    |  51 +-
 .../validation/RequestFeatureValidator.java        |  99 ++++
 .../request/validation/RequestProcessingPhase.java |  36 +-
 .../om/request/validation/RequestValidations.java  | 107 +++++
 .../om/request/validation/ValidationCondition.java |  55 +++
 .../om/request/validation/ValidationContext.java   |  52 ++
 .../om/request/validation/ValidatorRegistry.java   | 201 ++++++++
 .../ozone/om/request/validation/package-info.java  |  62 +++
 .../om/response/bucket/OMBucketCreateResponse.java |   3 +-
 .../om/response/bucket/OMBucketDeleteResponse.java |   3 +-
 .../response/file/OMFileCreateResponseWithFSO.java |   4 +-
 .../om/response/key/OMAllocateBlockResponse.java   |   3 +-
 .../key/OMAllocateBlockResponseWithFSO.java        |   3 +-
 .../ozone/om/response/key/OMKeyCommitResponse.java |   4 +-
 .../response/key/OMKeyCommitResponseWithFSO.java   |   4 +-
 .../ozone/om/response/key/OMKeyCreateResponse.java |   3 +-
 .../response/key/OMKeyCreateResponseWithFSO.java   |   4 +-
 .../ozone/om/response/key/OMKeyDeleteResponse.java |   3 +-
 .../response/key/OMKeyDeleteResponseWithFSO.java   |   3 +-
 .../response/key/OMKeyRenameResponseWithFSO.java   |   3 +-
 .../om/response/key/OMKeysDeleteResponse.java      |   4 +-
 .../om/response/key/OMKeysRenameResponse.java      |  10 +
 .../om/response/key/OMOpenKeysDeleteResponse.java  |  18 +-
 .../multipart/S3MultipartUploadAbortResponse.java  |   9 +-
 .../S3MultipartUploadCommitPartResponse.java       |   9 +-
 .../S3MultipartUploadCompleteResponse.java         |  36 +-
 .../S3MultipartUploadCompleteResponseWithFSO.java  |   9 +-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |  25 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |   2 +
 .../org/apache/hadoop/ozone/om/OmTestManagers.java |  11 +
 .../ozone/om/ScmBlockLocationTestingClient.java    |   2 +-
 .../hadoop/ozone/om/TestBucketManagerImpl.java     | 243 +++++-----
 .../hadoop/ozone/om/TestKeyDeletingService.java    |   2 +-
 .../apache/hadoop/ozone/om/TestKeyManagerUnit.java |  14 +-
 .../hadoop/ozone/om/TestOmMetadataManager.java     |  50 +-
 .../hadoop/ozone/om/TestOzoneConfigUtil.java       |   2 +-
 .../hadoop/ozone/om/TestOzoneManagerStarter.java   |   4 +-
 .../apache/hadoop/ozone/om/TestTrashService.java   |   4 +-
 .../ozone/om/request/OMRequestTestUtils.java       |  16 +
 .../bucket/TestOMBucketCreateRequestWithFSO.java   |   4 +
 .../bucket/TestOMBucketDeleteRequestWithFSO.java   |  76 +++
 .../request/key/TestOMKeyDeleteRequestWithFSO.java |  97 +++-
 .../ozone/om/request/key/TestOMKeyRequest.java     |   2 +-
 .../request/key/TestOMOpenKeysDeleteRequest.java   | 174 ++++---
 .../TestS3MultipartUploadCompleteRequest.java      |  32 +-
 .../security/TestOMGetDelegationTokenRequest.java  |  10 +-
 .../TestRequestFeatureValidatorProcessor.java      | 524 +++++++++++++++++++++
 .../request/validation/TestRequestValidations.java | 349 ++++++++++++++
 .../request/validation/TestValidatorRegistry.java  | 215 +++++++++
 .../GeneralValidatorsForTesting.java               | 190 ++++++++
 .../ValidatorsForOnlyOldClientValidations.java     |  43 ++
 .../om/response/key/TestOMKeyDeleteResponse.java   |   3 +-
 .../response/key/TestOMOpenKeysDeleteResponse.java |  61 ++-
 .../s3/multipart/TestS3MultipartResponse.java      |  19 +-
 ...stS3MultipartUploadCompleteResponseWithFSO.java |  17 +-
 .../ozone/security/TestAWSV4AuthValidator.java     |   2 +-
 .../security/acl/TestOzoneNativeAuthorizer.java    |   8 +-
 .../hadoop/ozone/security/acl/TestParentAcl.java   |   2 +-
 .../hadoop/ozone/security/acl/TestVolumeOwner.java |   2 +-
 .../apache/hadoop/fs/ozone/OzoneClientUtils.java   |   2 +-
 .../hadoop/fs/ozone/TestOzoneClientUtils.java      |   4 +-
 hadoop-ozone/ozonefs-hadoop2/pom.xml               |   1 +
 .../hadoop/fs/ozone/Hadoop27RpcTransport.java      |   3 +-
 hadoop-ozone/ozonefs-hadoop3/pom.xml               |   1 +
 hadoop-ozone/ozonefs-shaded/pom.xml                |   5 +
 hadoop-ozone/recon-codegen/pom.xml                 |   4 +
 .../hadoop/ozone/recon/ReconControllerModule.java  |   2 +
 .../org/apache/hadoop/ozone/recon/ReconServer.java | 125 +++++
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  |  29 ++
 .../ozone/recon/api/ClusterStateEndpoint.java      |  15 +-
 .../recon/metrics/OzoneManagerSyncMetrics.java     |  12 +
 .../recon/metrics/ReconTaskStatusMetrics.java      |  83 ++++
 .../hadoop/ozone/recon/scm/ReconNodeManager.java   |  18 +-
 .../ozone/recon/scm/ReconPipelineManager.java      |   4 +-
 .../hadoop/ozone/recon/scm/ReconStorageConfig.java |  61 ++-
 .../scm/ReconStorageContainerManagerFacade.java    |  11 +-
 .../spi/impl/OzoneManagerServiceProviderImpl.java  |  14 +-
 .../impl/StorageContainerServiceProviderImpl.java  |  20 +-
 .../ozone/recon/OMMetadataManagerTestUtils.java    |  12 +-
 .../ozone/recon/api/TestContainerEndpoint.java     |   2 +-
 .../hadoop/ozone/recon/api/TestEndpoints.java      |   7 +-
 .../ozone/recon/api/TestOpenContainerCount.java    |  16 +-
 .../recon/fsck/TestContainerHealthStatus.java      |   4 +-
 .../ozone/recon/fsck/TestContainerHealthTask.java  |   6 +-
 .../TestContainerHealthTaskRecordGenerator.java    |   3 +-
 .../recovery/TestReconOmMetadataManagerImpl.java   |   4 +-
 .../scm/AbstractReconContainerManagerTest.java     |  20 +-
 .../ozone/recon/scm/TestReconContainerManager.java |   2 +-
 .../ozone/recon/scm/TestReconNodeManager.java      |  12 +-
 .../ozone/recon/scm/TestReconPipelineManager.java  |  15 +-
 .../impl/TestReconNamespaceSummaryManagerImpl.java |   6 +-
 .../recon/tasks/TestContainerKeyMapperTask.java    |   4 +-
 .../ozone/recon/tasks/TestNSSummaryTask.java       |   4 +-
 .../ozone/recon/tasks/TestOMDBUpdatesHandler.java  |   4 +-
 .../java/org/apache/hadoop/ozone/s3/Gateway.java   |   9 +-
 .../hadoop/ozone/s3/OzoneClientProducer.java       |  24 +-
 .../hadoop/ozone/s3/endpoint/BucketEndpoint.java   |  13 +
 .../hadoop/ozone/s3/endpoint/EndpointBase.java     |  12 +-
 .../ozone/s3/endpoint/ListBucketResponse.java      |   6 +-
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   |  34 +-
 .../hadoop/ozone/s3/endpoint/RootEndpoint.java     |   9 +-
 .../hadoop/ozone/s3/exception/S3ErrorTable.java    |   9 +
 .../hadoop/ozone/s3/metrics/S3GatewayMetrics.java  | 320 +++++++++++++
 .../hadoop/ozone/s3/metrics/package-info.java      |  23 +-
 .../hadoop/ozone/client/OzoneBucketStub.java       |   2 +-
 .../ozone/s3/metrics/TestS3GatewayMetrics.java     | 113 +++++
 .../apache/hadoop/ozone/s3/util/TestS3Utils.java   |   4 +-
 .../apache/hadoop/ozone/debug/ChunkKeyHandler.java |   3 +-
 .../apache/hadoop/ozone/debug/ReadReplicas.java    |   5 +-
 .../ozone/freon/LeaderAppendLogEntryGenerator.java |   2 +-
 .../apache/hadoop/ozone/freon/OmKeyGenerator.java  |   2 +-
 .../hadoop/ozone/freon/SCMThroughputBenchmark.java |   2 +-
 .../freon/containergenerator/GeneratorOm.java      |   5 +-
 .../freon/containergenerator/GeneratorScm.java     |   4 +-
 .../apache/hadoop/ozone/fsck/ContainerMapper.java  |   5 +-
 .../apache/hadoop/ozone/freon/TestProgressBar.java |   6 +-
 pom.xml                                            |  85 +++-
 580 files changed, 11454 insertions(+), 4601 deletions(-)

diff --cc hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 2e49f02,a6f22d4..07a444a
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@@ -212,10 -219,9 +212,10 @@@ public class BlockInputStream extends B
    protected List<ChunkInfo> getChunkInfos() throws IOException {
      // irrespective of the container state, we will always read via Standalone
      // protocol.
 -    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
 +    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE && pipeline
 +        .getType() != HddsProtos.ReplicationType.EC) {
        pipeline = Pipeline.newBuilder(pipeline)
-           .setReplicationConfig(new StandaloneReplicationConfig(
+           .setReplicationConfig(StandaloneReplicationConfig.getInstance(
                ReplicationConfig
                    .getLegacyFactor(pipeline.getReplicationConfig())))
            .build();
diff --cc hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
index 1e2264b,0071924..e99b6a5
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
@@@ -25,7 -25,7 +25,8 @@@ import java.time.Instant
  import java.util.Arrays;
  import java.util.Comparator;
  
+ import com.fasterxml.jackson.annotation.JsonProperty;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
  import org.apache.hadoop.hdds.client.ReplicationConfig;
  import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
  import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
diff --cc hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 51c0446,fa87ab2..4b7fda9
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@@ -466,19 -449,12 +454,19 @@@ public final class OzoneConfigKeys 
    public static final String OZONE_CLIENT_TEST_OFS_BUCKET_LAYOUT_DEFAULT =
        "FILE_SYSTEM_OPTIMIZED";
  
-   public static final String OZONE_OM_CLIENT_PROTOCOL_VERSION_KEY =
-       "ozone.om.client.protocol.version";
-   // The version of the protocol for Client (S3G/OFS) to OM Communication.
-   // The protocol starts at 2.0.0 and a null or empty value for older versions.
-   public static final String OZONE_OM_CLIENT_PROTOCOL_VERSION = "2.0.0";
+   public static final String OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY =
+       "ozone.client.required.om.version.min";
+ 
+   public static final String OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT =
+       OzoneManagerVersion.S3G_PERSISTENT_CONNECTIONS.name();
  
 +  public static final String
 +      OZONE_CLIENT_BUCKET_REPLICATION_CONFIG_REFRESH_PERIOD_MS =
 +      "ozone.client.bucket.replication.config.refresh.time.ms";
 +  public static final long
 +      OZONE_CLIENT_BUCKET_REPLICATION_CONFIG_REFRESH_PERIOD_DEFAULT_MS =
 +      300 * 1000;
 +
    /**
     * There is no need to instantiate this class.
     */
diff --cc hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
index 4d874da,0000000..9ec19bf
mode 100644,000000..100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
@@@ -1,86 -1,0 +1,86 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + *  with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *  Unless required by applicable law or agreed to in writing, software
 + *  distributed under the License is distributed on an "AS IS" BASIS,
 + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  See the License for the specific language governing permissions and
 + *  limitations under the License.
 + */
 +package org.apache.hadoop.hdds.scm.container;
 +
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import java.io.IOException;
 +
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
 +
 +/**
 + * Tests for the ContainerInfo class.
 + */
 +
 +public class TestContainerInfo {
 +
 +  @Test
 +  public void getProtobufMessageEC() throws IOException {
 +    ContainerInfo container =
-         createContainerInfo(new RatisReplicationConfig(THREE));
++        createContainerInfo(RatisReplicationConfig.getInstance(THREE));
 +    HddsProtos.ContainerInfoProto proto = container.getProtobuf();
 +
 +    // No EC Config
 +    Assert.assertFalse(proto.hasEcReplicationConfig());
 +    Assert.assertEquals(THREE, proto.getReplicationFactor());
 +    Assert.assertEquals(RATIS, proto.getReplicationType());
 +
 +    // Reconstruct object from Proto
 +    ContainerInfo recovered = ContainerInfo.fromProtobuf(proto);
 +    Assert.assertEquals(RATIS, recovered.getReplicationType());
 +    Assert.assertTrue(
 +        recovered.getReplicationConfig() instanceof RatisReplicationConfig);
 +
 +    // EC Config
 +    container = createContainerInfo(new ECReplicationConfig(3, 2));
 +    proto = container.getProtobuf();
 +
 +    Assert.assertEquals(3, proto.getEcReplicationConfig().getData());
 +    Assert.assertEquals(2, proto.getEcReplicationConfig().getParity());
 +    Assert.assertFalse(proto.hasReplicationFactor());
 +    Assert.assertEquals(EC, proto.getReplicationType());
 +
 +    // Reconstruct object from Proto
 +    recovered = ContainerInfo.fromProtobuf(proto);
 +    Assert.assertEquals(EC, recovered.getReplicationType());
 +    Assert.assertTrue(
 +        recovered.getReplicationConfig() instanceof ECReplicationConfig);
 +    ECReplicationConfig config =
 +        (ECReplicationConfig)recovered.getReplicationConfig();
 +    Assert.assertEquals(3, config.getData());
 +    Assert.assertEquals(2, config.getParity());
 +  }
 +
 +  private ContainerInfo createContainerInfo(ReplicationConfig repConfig) {
 +    ContainerInfo.Builder builder = new ContainerInfo.Builder();
 +    builder.setContainerID(1234)
 +        .setReplicationConfig(repConfig)
 +        .setPipelineID(PipelineID.randomId())
 +        .setState(HddsProtos.LifeCycleState.OPEN)
 +        .setOwner("scm");
 +    return builder.build();
 +  }
 +}
diff --cc hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 72e9b92,ba131ff..fdf44ed
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@@ -681,10 -751,9 +751,10 @@@ public class KeyValueContainer implemen
          .setWriteCount(containerData.getWriteCount())
          .setReadBytes(containerData.getReadBytes())
          .setWriteBytes(containerData.getWriteBytes())
-         .setKeyCount(containerData.getKeyCount())
+         .setKeyCount(containerData.getBlockCount())
          .setUsed(containerData.getBytesUsed())
          .setState(getHddsState())
 +        .setReplicaIndex(containerData.getReplicaIndex())
          .setDeleteTransactionId(containerData.getDeleteTransactionId())
          .setBlockCommitSequenceId(containerData.getBlockCommitSequenceId())
          .setOriginNodeId(containerData.getOriginNodeId());
diff --cc hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 0ba9f3b,e1c19f7..a14be22
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@@ -115,9 -113,6 +116,8 @@@ import java.util.Map
  import java.util.Optional;
  import java.util.function.Consumer;
  
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
- import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
 +
  /**
   * This class is the client-side translator to translate the requests made on
   * the {@link StorageContainerLocationProtocol} interface to the RPC server
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index b46bc79,9b94e20..dd9f71a
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@@ -110,11 -129,10 +130,9 @@@ public class AbstractContainerReportHan
     * @throws ContainerNotFoundException If the container is not present
     */
    private void updateContainerStats(final DatanodeDetails datanodeDetails,
-                                     final ContainerID containerId,
+                                     final ContainerInfo containerInfo,
                                      final ContainerReplicaProto replicaProto)
        throws ContainerNotFoundException {
-     final ContainerInfo containerInfo = containerManager
-         .getContainer(containerId);
 -    final ContainerID containerId = containerInfo.containerID();
  
      if (isHealthy(replicaProto::getState)) {
        if (containerInfo.getSequenceId() <
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index e62a8b4,1c39efd..e4076d9
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@@ -255,19 -270,13 +272,19 @@@ public class ContainerManagerImpl imple
          .setOwner(owner)
          .setContainerID(containerID.getId())
          .setDeleteTransactionId(0)
 -        .setReplicationFactor(
 -            ReplicationConfig.getLegacyFactor(pipeline.getReplicationConfig()))
 -        .setReplicationType(pipeline.getType())
 -        .build();
 -    containerStateManager.addContainer(containerInfo);
 +        .setReplicationType(pipeline.getType());
 +
 +    if (pipeline.getReplicationConfig() instanceof ECReplicationConfig) {
 +      containerInfoBuilder.setEcReplicationConfig(
 +          ((ECReplicationConfig) pipeline.getReplicationConfig()).toProto());
 +    } else {
 +      containerInfoBuilder.setReplicationFactor(
 +          ReplicationConfig.getLegacyFactor(pipeline.getReplicationConfig()));
 +    }
 +
 +    containerStateManager.addContainer(containerInfoBuilder.build());
      scmContainerManagerMetrics.incNumSuccessfulCreateContainers();
-     return containerStateManager.getContainer(containerID.getProtobuf());
+     return containerStateManager.getContainer(containerID);
    }
  
    @Override
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index bba6edb,2168138..19b7a51
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@@ -177,10 -167,9 +177,10 @@@ public class PipelineManagerImpl implem
  
      acquireWriteLock();
      try {
 -      Pipeline pipeline = pipelineFactory.create(replicationConfig);
 +      Pipeline pipeline = pipelineFactory.create(replicationConfig,
 +          excludedNodes, favoredNodes);
        stateManager.addPipeline(pipeline.getProtobufMessage(
-           ClientVersions.CURRENT_VERSION));
+           ClientVersion.CURRENT_VERSION));
        recordMetricsForPipeline(pipeline);
        return pipeline;
      } catch (IOException ex) {
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
index 470136a,d1e97fe..ee0b637
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
@@@ -155,14 -154,19 +155,28 @@@ public class TestContainerManagerImpl 
          containerManager.getContainers(HddsProtos.LifeCycleState.OPEN).size());
      Assert.assertEquals(2, containerManager
          .getContainers(HddsProtos.LifeCycleState.CLOSING).size());
+     containerManager.updateContainerState(cidArray[1],
+         HddsProtos.LifeCycleEvent.QUASI_CLOSE);
+     containerManager.updateContainerState(cidArray[2],
+         HddsProtos.LifeCycleEvent.FINALIZE);
+     containerManager.updateContainerState(cidArray[2],
+         HddsProtos.LifeCycleEvent.CLOSE);
+     Assert.assertEquals(7, containerManager.
+         getContainerStateCount(HddsProtos.LifeCycleState.OPEN));
+     Assert.assertEquals(1, containerManager
+         .getContainerStateCount(HddsProtos.LifeCycleState.CLOSING));
+     Assert.assertEquals(1, containerManager
+         .getContainerStateCount(HddsProtos.LifeCycleState.QUASI_CLOSED));
+     Assert.assertEquals(1, containerManager
+         .getContainerStateCount(HddsProtos.LifeCycleState.CLOSED));
    }
 +
 +  @Test
 +  public void testAllocateContainersWithECReplicationConfig() throws Exception {
 +    final ContainerInfo admin = containerManager
 +        .allocateContainer(new ECReplicationConfig(3, 2), "admin");
 +    Assert.assertEquals(1, containerManager.getContainers().size());
 +    Assert.assertNotNull(containerManager.getContainer(admin.containerID()));
 +  }
 +
  }
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 00d96dc,c82006e..523efc0
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@@ -28,13 -26,10 +28,13 @@@ import org.apache.hadoop.hdds.scm.metad
  import org.apache.hadoop.hdds.scm.node.NodeManager;
  import org.apache.hadoop.hdds.utils.db.DBStore;
  import org.apache.hadoop.hdds.utils.db.Table;
- import org.apache.hadoop.ozone.ClientVersions;
+ import org.apache.hadoop.ozone.ClientVersion;
  
  import java.io.IOException;
 +import java.util.ArrayList;
  import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
  import java.util.NavigableSet;
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 3152c39,3a68db1..fd29929
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@@ -224,19 -220,15 +226,21 @@@ public class TestPipelineManagerImpl 
      Assert.assertEquals(Pipeline.PipelineState.DORMANT,
          pipelineStore.get(pipeline.getId()).getPipelineState());
      Assert.assertFalse(pipelineManager
-         .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
+         .getPipelines(RatisReplicationConfig
+             .getInstance(ReplicationFactor.THREE),
              Pipeline.PipelineState.OPEN).contains(pipeline));
-     Assert.assertEquals(1, pipelineManager
-         .getPipelineCount(new RatisReplicationConfig(ReplicationFactor.THREE),
++    Assert.assertEquals(1, pipelineManager.getPipelineCount(
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
 +            Pipeline.PipelineState.DORMANT));
  
      pipelineManager.activatePipeline(pipeline.getId());
      Assert.assertTrue(pipelineManager
-         .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
+         .getPipelines(RatisReplicationConfig
+             .getInstance(ReplicationFactor.THREE),
              Pipeline.PipelineState.OPEN).contains(pipeline));
-     Assert.assertEquals(1, pipelineManager
-         .getPipelineCount(new RatisReplicationConfig(ReplicationFactor.THREE),
++    Assert.assertEquals(1, pipelineManager.getPipelineCount(
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
 +            Pipeline.PipelineState.OPEN));
      buffer.flush();
      Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
      pipelineManager.close();
@@@ -715,43 -724,6 +736,43 @@@
      assertTrue(containerLogIdx < pipelineLogIdx);
    }
  
 +  @Test
 +  public void testCreatePipelineForRead() throws IOException {
 +    PipelineManager pipelineManager = createPipelineManager(true);
 +    List<DatanodeDetails> dns = nodeManager
 +        .getNodes(NodeStatus.inServiceHealthy())
 +        .stream()
 +        .limit(3)
 +        .collect(Collectors.toList());
 +    Set<ContainerReplica> replicas = createContainerReplicasList(dns);
 +    Pipeline pipeline = pipelineManager.createPipelineForRead(
-         new RatisReplicationConfig(ReplicationFactor.THREE), replicas);
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE), replicas);
 +    Assert.assertEquals(3, pipeline.getNodes().size());
 +    for (DatanodeDetails dn : pipeline.getNodes())  {
 +      Assert.assertTrue(dns.contains(dn));
 +    }
 +  }
 +
 +  private Set<ContainerReplica> createContainerReplicasList(
 +      List <DatanodeDetails> dns) {
 +    Set<ContainerReplica> replicas = new HashSet<>();
 +    for (DatanodeDetails dn : dns) {
 +      ContainerReplica r = ContainerReplica.newBuilder()
 +          .setBytesUsed(1)
 +          .setContainerID(ContainerID.valueOf(1))
 +          .setContainerState(StorageContainerDatanodeProtocolProtos
 +              .ContainerReplicaProto.State.CLOSED)
 +          .setKeyCount(1)
 +          .setOriginNodeId(UUID.randomUUID())
 +          .setSequenceId(1)
 +          .setReplicaIndex(0)
 +          .setDatanodeDetails(dn)
 +          .build();
 +      replicas.add(r);
 +    }
 +    return replicas;
 +  }
 +
    private void sendPipelineReport(
        DatanodeDetails dn, Pipeline pipeline,
        PipelineReportHandler pipelineReportHandler,
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
index 11ae86c,0000000..8168ce8
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
@@@ -1,95 -1,0 +1,99 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hdds.scm.pipeline;
 +
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +
 +import java.io.IOException;
 +
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 +import static org.junit.Assert.assertEquals;
 +
 +/**
 + * Tests for PipelineStateMap.
 + */
 +
 +public class TestPipelineStateMap {
 +
 +  private PipelineStateMap map;
 +
 +  @Before
 +  public void setup() {
 +    map = new PipelineStateMap();
 +  }
 +
 +  @After
 +  public void teardown() throws IOException {
 +  }
 +
 +  @Test
 +  public void testCountPipelines() throws IOException {
 +    Pipeline p;
 +
 +    // Open Stanadlone Pipelines
 +    map.addPipeline(MockPipeline.createPipeline(1));
 +    map.addPipeline(MockPipeline.createPipeline(1));
 +    p = MockPipeline.createPipeline(1);
 +    map.addPipeline(p);
 +    map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
 +
 +    // Ratis pipeline
 +    map.addPipeline(MockPipeline.createRatisPipeline());
 +    p = MockPipeline.createRatisPipeline();
 +    map.addPipeline(p);
 +    map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
 +
 +    // EC Pipelines
 +    map.addPipeline(MockPipeline.createEcPipeline(
 +        new ECReplicationConfig(3, 2)));
 +    map.addPipeline(MockPipeline.createEcPipeline(
 +        new ECReplicationConfig(3, 2)));
 +    p = MockPipeline.createEcPipeline(new ECReplicationConfig(3, 2));
 +    map.addPipeline(p);
 +    map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
 +
-     assertEquals(2, map.getPipelineCount(new StandaloneReplicationConfig(ONE),
++    assertEquals(2, map.getPipelineCount(
++        StandaloneReplicationConfig.getInstance(ONE),
 +        Pipeline.PipelineState.OPEN));
-     assertEquals(1, map.getPipelineCount(new RatisReplicationConfig(THREE),
++    assertEquals(1, map.getPipelineCount(
++        RatisReplicationConfig.getInstance(THREE),
 +        Pipeline.PipelineState.OPEN));
 +    assertEquals(2, map.getPipelineCount(new ECReplicationConfig(3, 2),
 +        Pipeline.PipelineState.OPEN));
 +
 +    assertEquals(0, map.getPipelineCount(new ECReplicationConfig(6, 3),
 +        Pipeline.PipelineState.OPEN));
 +
-     assertEquals(1, map.getPipelineCount(new StandaloneReplicationConfig(ONE),
++    assertEquals(1, map.getPipelineCount(
++        StandaloneReplicationConfig.getInstance(ONE),
 +        Pipeline.PipelineState.CLOSED));
-     assertEquals(1, map.getPipelineCount(new RatisReplicationConfig(THREE),
++    assertEquals(1, map.getPipelineCount(
++        RatisReplicationConfig.getInstance(THREE),
 +        Pipeline.PipelineState.CLOSED));
 +    assertEquals(1, map.getPipelineCount(new ECReplicationConfig(3, 2),
 +        Pipeline.PipelineState.CLOSED));
 +  }
 +
 +
 +}
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index fa8da28,d2b19fb..c0ce8d4
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@@ -220,17 -219,15 +224,20 @@@ public class TestRatisPipelineProvider 
      List<DatanodeDetails> healthyNodes = nodeManager
          .getNodes(NodeStatus.inServiceHealthy()).stream()
          .limit(3).collect(Collectors.toList());
 +    Set<ContainerReplica> replicas = createContainerReplicas(healthyNodes);
  
      Pipeline pipeline1 = provider.create(
-         new RatisReplicationConfig(ReplicationFactor.THREE), healthyNodes);
+         RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+         healthyNodes);
      Pipeline pipeline2 = provider.create(
-         new RatisReplicationConfig(ReplicationFactor.THREE), healthyNodes);
+         RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+         healthyNodes);
 +    Pipeline pipeline3 = provider.createForRead(
-         new RatisReplicationConfig(ReplicationFactor.THREE), replicas);
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
++        replicas);
  
      Assert.assertEquals(pipeline1.getNodeSet(), pipeline2.getNodeSet());
 +    Assert.assertEquals(pipeline2.getNodeSet(), pipeline3.getNodeSet());
      cleanup();
    }
  
@@@ -284,31 -281,6 +291,31 @@@
    }
  
    @Test
 +  // Test excluded nodes work correctly. Note that for Ratis, the
 +  // PipelinePlacementPolicy, which Ratis is hardcoded to use, does not consider
 +  // favored nodes.
 +  public void testCreateFactorTHREEPipelineWithExcludedDatanodes()
 +      throws Exception {
 +    init(1);
 +    int healthyCount = nodeManager.getNodes(NodeStatus.inServiceHealthy())
 +        .size();
 +    // Add all but 3 nodes to the exclude list and ensure that the 3 picked
 +    // nodes are not in the excluded list.
 +    List<DatanodeDetails> excludedNodes = nodeManager
 +        .getNodes(NodeStatus.inServiceHealthy()).stream()
 +        .limit(healthyCount - 3).collect(Collectors.toList());
 +
 +    Pipeline pipeline1 = provider.create(
-         new RatisReplicationConfig(ReplicationFactor.THREE), excludedNodes,
-         Collections.EMPTY_LIST);
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
++        excludedNodes, Collections.EMPTY_LIST);
 +
 +    for (DatanodeDetails dn : pipeline1.getNodes()) {
 +      assertFalse(excludedNodes.contains(dn));
 +    }
 +  }
 +
 +
 +  @Test
    public void testCreatePipelinesWhenNotEnoughSpace() throws Exception {
      String expectedErrorSubstring = "Unable to find enough" +
          " nodes that meet the space requirement";
@@@ -319,11 -291,8 +326,11 @@@
      largeContainerConf.set(OZONE_SCM_CONTAINER_SIZE, "100TB");
      init(1, largeContainerConf);
      for (ReplicationFactor factor: ReplicationFactor.values()) {
 +      if (factor == ReplicationFactor.ZERO) {
 +        continue;
 +      }
        try {
-         provider.create(new RatisReplicationConfig(factor));
+         provider.create(RatisReplicationConfig.getInstance(factor));
          Assert.fail("Expected SCMException for large container size with " +
              "replication factor " + factor.toString());
        } catch (SCMException ex) {
@@@ -335,11 -304,8 +342,11 @@@
      largeMetadataConf.set(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN, "100TB");
      init(1, largeMetadataConf);
      for (ReplicationFactor factor: ReplicationFactor.values()) {
 +      if (factor == ReplicationFactor.ZERO) {
 +        continue;
 +      }
        try {
-         provider.create(new RatisReplicationConfig(factor));
+         provider.create(RatisReplicationConfig.getInstance(factor));
          Assert.fail("Expected SCMException for large metadata size with " +
              "replication factor " + factor.toString());
        } catch (SCMException ex) {
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
index b3ce216,0000000..ae997a3
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@@ -1,446 -1,0 +1,446 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership.  The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +package org.apache.hadoop.hdds.scm.pipeline;
 +
 +import org.apache.hadoop.hdds.HddsConfigKeys;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 +import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
 +import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 +import org.apache.hadoop.hdds.scm.container.ContainerID;
 +import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 +import org.apache.hadoop.hdds.scm.container.ContainerManager;
 +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 +import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 +import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
 +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
- import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 +import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
++import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 +import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 +import org.apache.hadoop.hdds.scm.node.NodeManager;
 +import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
 +import org.apache.hadoop.hdds.utils.db.DBStore;
 +import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 +import org.apache.ozone.test.GenericTestUtils;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.mockito.Matchers;
 +import org.mockito.Mockito;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.NavigableSet;
 +import java.util.Set;
 +import java.util.UUID;
 +
 +import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
 +import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
 +import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotEquals;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertNull;
 +import static org.junit.Assert.assertTrue;
 +import static org.mockito.Mockito.verify;
 +
 +/**
 + * Tests to validate the WritableECContainerProvider works correctly.
 + */
 +public class TestWritableECContainerProvider {
 +
 +  private static final Logger LOG = LoggerFactory
 +      .getLogger(TestWritableECContainerProvider.class);
 +  private static final String OWNER = "SCM";
 +  private PipelineManager pipelineManager;
 +  private ContainerManager containerManager
 +      = Mockito.mock(ContainerManager.class);
 +  private PipelineChoosePolicy pipelineChoosingPolicy
 +      = new HealthyPipelineChoosePolicy();
 +
 +  private OzoneConfiguration conf;
 +  private DBStore dbStore;
 +  private SCMHAManager scmhaManager;
 +  private NodeManager nodeManager;
 +  private WritableContainerProvider provider;
 +  private ReplicationConfig repConfig;
 +  private int minPipelines;
 +
 +  private Map<ContainerID, ContainerInfo> containers;
 +
 +  @Before
 +  public void setup() throws IOException {
 +    repConfig = new ECReplicationConfig(3, 2);
 +    conf = new OzoneConfiguration();
 +    WritableECContainerProvider.WritableECContainerProviderConfig providerConf =
 +        conf.getObject(WritableECContainerProvider
 +            .WritableECContainerProviderConfig.class);
 +    minPipelines = providerConf.getMinimumPipelines();
 +    containers = new HashMap<>();
 +    File testDir = GenericTestUtils.getTestDir(
 +        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
 +    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
 +    dbStore = DBStoreBuilder.createDBStore(
 +        conf, new SCMDBDefinition());
-     scmhaManager = MockSCMHAManager.getInstance(true);
++    scmhaManager = SCMHAManagerStub.getInstance(true);
 +    nodeManager = new MockNodeManager(true, 10);
 +    pipelineManager =
 +        new MockPipelineManager(dbStore, scmhaManager, nodeManager);
 +    provider = new WritableECContainerProvider(
 +        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
 +
 +    Mockito.doAnswer(call -> {
 +      Pipeline pipeline = (Pipeline)call.getArguments()[2];
 +      ContainerInfo container = createContainer(pipeline,
 +          repConfig, System.nanoTime());
 +      pipelineManager.addContainerToPipeline(
 +          pipeline.getId(), container.containerID());
 +      containers.put(container.containerID(), container);
 +      return container;
 +    }).when(containerManager).getMatchingContainer(Matchers.anyLong(),
 +        Matchers.anyString(), Matchers.any(Pipeline.class));
 +
 +    Mockito.doAnswer(call ->
 +        containers.get((ContainerID)call.getArguments()[0]))
 +        .when(containerManager).getContainer(Matchers.any(ContainerID.class));
 +
 +  }
 +
 +  @Test
 +  public void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
 +      throws IOException {
 +    // The first 5 calls should return a different container
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container =
 +          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +      assertFalse(allocatedContainers.contains(container));
 +      allocatedContainers.add(container);
 +    }
 +
 +    allocatedContainers.clear();
 +    for (int i = 0; i < 20; i++) {
 +      ContainerInfo container =
 +          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +      allocatedContainers.add(container);
 +    }
 +    // Should have minPipelines containers created
 +    assertEquals(minPipelines,
 +        pipelineManager.getPipelines(repConfig, OPEN).size());
 +    // We should have more than 1 allocatedContainers in the set proving a
 +    // random container is selected each time. Do not check for 5 here as there
 +    // is a reasonable chance that in 20 turns we don't pick all 5 nodes.
 +    assertTrue(allocatedContainers.size() > 2);
 +  }
 +
 +  @Test
 +  public void testPiplineLimitIgnoresExcludedPipelines() throws IOException {
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container = provider.getContainer(
 +          1, repConfig, OWNER, new ExcludeList());
 +      allocatedContainers.add(container);
 +    }
 +    // We have the min limit of pipelines, but then exclude one. It should use
 +    // one of the existing rather than createing a new one, as the limit is
 +    // checked against all pipelines, not just the filtered list
 +    ExcludeList exclude = new ExcludeList();
 +    PipelineID excludedID = allocatedContainers
 +        .stream().findFirst().get().getPipelineID();
 +    exclude.addPipeline(excludedID);
 +
 +    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
 +    assertNotEquals(excludedID, c.getPipelineID());
 +    assertTrue(allocatedContainers.contains(c));
 +  }
 +
 +  @Test
 +  public void testNewPipelineCreatedIfAllPipelinesExcluded()
 +      throws IOException {
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container = provider.getContainer(
 +          1, repConfig, OWNER, new ExcludeList());
 +      allocatedContainers.add(container);
 +    }
 +    // We have the min limit of pipelines, but then exclude one. It should use
 +    // one of the existing rather than creating a new one, as the limit is
 +    // checked against all pipelines, not just the filtered list
 +    ExcludeList exclude = new ExcludeList();
 +    for (ContainerInfo c : allocatedContainers) {
 +      exclude.addPipeline(c.getPipelineID());
 +    }
 +    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
 +    assertFalse(allocatedContainers.contains(c));
 +  }
 +
 +  @Test
 +  public void testNewPipelineCreatedIfAllContainersExcluded()
 +      throws IOException {
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container = provider.getContainer(
 +          1, repConfig, OWNER, new ExcludeList());
 +      allocatedContainers.add(container);
 +    }
 +    // We have the min limit of pipelines, but then exclude one. It should use
 +    // one of the existing rather than createing a new one, as the limit is
 +    // checked against all pipelines, not just the filtered list
 +    ExcludeList exclude = new ExcludeList();
 +    for (ContainerInfo c : allocatedContainers) {
 +      exclude.addConatinerId(c.containerID());
 +    }
 +    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
 +    assertFalse(allocatedContainers.contains(c));
 +  }
 +
 +  @Test
 +  public void testUnableToCreateAnyPipelinesReturnsNull() throws IOException {
-     pipelineManager =
-         new MockPipelineManager(dbStore, scmhaManager, nodeManager) {
++    pipelineManager = new MockPipelineManager(
++        dbStore, scmhaManager, nodeManager) {
 +      @Override
 +      public Pipeline createPipeline(ReplicationConfig repConf,
 +          List<DatanodeDetails> excludedNodes,
 +          List<DatanodeDetails> favoredNodes) throws IOException {
 +        throw new IOException("Cannot create pipelines");
 +      }
 +    };
 +    provider = new WritableECContainerProvider(
 +        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
 +
 +    ContainerInfo container =
 +        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +    assertNull(container);
 +  }
 +
 +  @Test
 +  public void testExistingPipelineReturnedWhenNewCannotBeCreated()
 +      throws IOException {
-     pipelineManager =
-         new MockPipelineManager(dbStore, scmhaManager, nodeManager) {
++    pipelineManager = new MockPipelineManager(
++        dbStore, scmhaManager, nodeManager) {
 +
 +      private boolean throwError = false;
 +
 +      @Override
 +      public Pipeline createPipeline(ReplicationConfig repConf,
 +          List<DatanodeDetails> excludedNodes,
 +          List<DatanodeDetails> favoredNodes) throws IOException {
 +        if (throwError) {
 +          throw new IOException("Cannot create pipelines");
 +        }
 +        throwError = true;
 +        return super.createPipeline(repConfig);
 +      }
 +    };
 +    provider = new WritableECContainerProvider(
 +        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
 +
 +    ContainerInfo container =
 +        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +    for (int i = 0; i < 5; i++) {
 +      ContainerInfo nextContainer =
 +          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +      assertEquals(container, nextContainer);
 +    }
 +  }
 +
 +  @Test
 +  public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting()
 +      throws IOException {
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container =
 +          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +      assertFalse(allocatedContainers.contains(container));
 +      allocatedContainers.add(container);
 +    }
 +    // Update all the containers to make them nearly full, but with enough space
 +    // for an EC block to be striped across them.
 +    for (ContainerInfo c : allocatedContainers) {
 +      c.setUsedBytes(getMaxContainerSize() - 30 * 1024 * 1024);
 +    }
 +
 +    // Get a new container of size 50 and ensure it is one of the original set.
 +    // We ask for a space of 50, but as it is stripped across the EC group it
 +    // will actually need 50 / dataNum space
 +    ContainerInfo newContainer =
 +        provider.getContainer(50 * 1024 * 1024, repConfig, OWNER,
 +            new ExcludeList());
 +    assertNotNull(newContainer);
 +    assertTrue(allocatedContainers.contains(newContainer));
 +    // Now get a new container where there is not enough space in the existing
 +    // and ensure a new container gets created.
 +    newContainer = provider.getContainer(
 +        128 * 1024 * 1024, repConfig, OWNER, new ExcludeList());
 +    assertNotNull(newContainer);
 +    assertFalse(allocatedContainers.contains(newContainer));
 +    // The original pipelines should all be closed, triggered by the lack of
 +    // space.
 +    for (ContainerInfo c : allocatedContainers) {
 +      Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
 +      assertEquals(CLOSED, pipeline.getPipelineState());
 +    }
 +  }
 +
 +  @Test
 +  public void testPipelineNotFoundWhenAttemptingToUseExisting()
 +      throws IOException {
 +    // Ensure PM throws PNF exception when we ask for the containers in the
 +    // pipeline
-     pipelineManager =
-         new MockPipelineManager(dbStore, scmhaManager, nodeManager) {
++    pipelineManager = new MockPipelineManager(
++        dbStore, scmhaManager, nodeManager) {
 +
 +      @Override
 +      public NavigableSet<ContainerID> getContainersInPipeline(
 +          PipelineID pipelineID) throws IOException {
 +        throw new PipelineNotFoundException("Simulated exception");
 +      }
 +    };
 +    provider = new WritableECContainerProvider(
 +        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
 +
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container =
 +          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +      assertFalse(allocatedContainers.contains(container));
 +      allocatedContainers.add(container);
 +    }
 +    // Now attempt to get a container - any attempt to use an existing with
 +    // throw PNF and then we must allocate a new one
 +    ContainerInfo newContainer =
 +        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +    assertNotNull(newContainer);
 +    assertFalse(allocatedContainers.contains(newContainer));
 +  }
 +
 +  @Test
 +  public void testContainerNotFoundWhenAttemptingToUseExisting()
 +      throws IOException {
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container =
 +          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +      assertFalse(allocatedContainers.contains(container));
 +      allocatedContainers.add(container);
 +    }
 +
 +    // Ensure ContainerManager always throws when a container is requested so
 +    // existing pipelines cannot be used
 +    Mockito.doAnswer(call -> {
 +      throw new ContainerNotFoundException();
 +    }).when(containerManager).getContainer(Matchers.any(ContainerID.class));
 +
 +    ContainerInfo newContainer =
 +        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +    assertNotNull(newContainer);
 +    assertFalse(allocatedContainers.contains(newContainer));
 +
 +    // Ensure all the existing pipelines are closed
 +    for (ContainerInfo c : allocatedContainers) {
 +      Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
 +      assertEquals(CLOSED, pipeline.getPipelineState());
 +    }
 +  }
 +
 +  @Test
 +  public void testPipelineOpenButContainerRemovedFromIt() throws IOException {
 +    // This can happen if the container close process is triggered from the DN.
 +    // When tha happens, CM will change the container state to CLOSING and
 +    // remove it from the container list in pipeline Manager.
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container = provider.getContainer(
 +          1, repConfig, OWNER, new ExcludeList());
 +      assertFalse(allocatedContainers.contains(container));
 +      allocatedContainers.add(container);
 +      // Remove the container from the pipeline to simulate closing it
 +      pipelineManager.removeContainerFromPipeline(
 +          container.getPipelineID(), container.containerID());
 +    }
 +    ContainerInfo newContainer = provider.getContainer(
 +        1, repConfig, OWNER, new ExcludeList());
 +    assertFalse(allocatedContainers.contains(newContainer));
 +    for (ContainerInfo c : allocatedContainers) {
 +      Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
 +      assertEquals(CLOSED, pipeline.getPipelineState());
 +    }
 +  }
 +
 +  @Test
 +  public void testExcludedNodesPassedToCreatePipelineIfProvided()
 +      throws IOException {
 +    PipelineManager pipelineManagerSpy = Mockito.spy(pipelineManager);
 +    provider = new WritableECContainerProvider(
 +        conf, pipelineManagerSpy, containerManager, pipelineChoosingPolicy);
 +    ExcludeList excludeList = new ExcludeList();
 +
 +    // EmptyList should be passed if there are no nodes excluded.
 +    ContainerInfo container = provider.getContainer(
 +        1, repConfig, OWNER, excludeList);
 +    assertNotNull(container);
 +
 +    verify(pipelineManagerSpy).createPipeline(repConfig,
 +        Collections.emptyList(), Collections.emptyList());
 +
 +    // If nodes are excluded then the excluded nodes should be passed through to
 +    // the create pipeline call.
 +    excludeList.addDatanode(MockDatanodeDetails.randomDatanodeDetails());
 +    List<DatanodeDetails> excludedNodes =
 +        new ArrayList<>(excludeList.getDatanodes());
 +
 +    container = provider.getContainer(
 +        1, repConfig, OWNER, excludeList);
 +    assertNotNull(container);
 +    verify(pipelineManagerSpy).createPipeline(repConfig, excludedNodes,
 +        Collections.emptyList());
 +  }
 +
 +  private ContainerInfo createContainer(Pipeline pipeline,
 +      ReplicationConfig repConf, long containerID) {
 +    return new ContainerInfo.Builder()
 +        .setContainerID(containerID)
 +        .setOwner(OWNER)
 +        .setReplicationConfig(repConf)
 +        .setState(HddsProtos.LifeCycleState.OPEN)
 +        .setPipelineID(pipeline.getId())
 +        .setNumberOfKeys(0)
 +        .setUsedBytes(0)
 +        .setSequenceId(0)
 +        .setDeleteTransactionId(0)
 +        .build();
 +  }
 +
 +  private long getMaxContainerSize() {
 +    return (long)conf.getStorageSize(
 +        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
 +        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);
 +  }
 +
 +}
diff --cc hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
index 2ea34b9,a7960d8..10b5758
--- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
@@@ -246,26 -207,6 +246,26 @@@ public class TestInfoSubCommand 
      return new ContainerWithPipeline(container, pipeline);
    }
  
 +  private ContainerWithPipeline getECContainerWithPipeline() {
 +    Pipeline pipeline = new Pipeline.Builder()
 +        .setState(Pipeline.PipelineState.CLOSED)
 +        .setReplicationConfig(new ECReplicationConfig(3, 2))
 +        .setId(PipelineID.randomId())
 +        .setNodes(datanodes)
 +        .build();
 +
 +    ContainerInfo container = new ContainerInfo.Builder()
 +        .setSequenceId(1)
 +        .setPipelineID(pipeline.getId())
 +        .setUsedBytes(1234)
-         .setReplicationConfig(new RatisReplicationConfig(THREE))
++        .setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
 +        .setNumberOfKeys(1)
 +        .setState(CLOSED)
 +        .build();
 +
 +    return new ContainerWithPipeline(container, pipeline);
 +  }
 +
    private List<DatanodeDetails> createDatanodeDetails(int count) {
      List<DatanodeDetails> dns = new ArrayList<>();
      for (int i = 0; i < count; i++) {
diff --cc hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java
index 9d70f3a,0000000..acc3bda
mode 100644,000000..100644
--- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java
@@@ -1,192 -1,0 +1,192 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + *  with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *  Unless required by applicable law or agreed to in writing, software
 + *  distributed under the License is distributed on an "AS IS" BASIS,
 + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  See the License for the specific language governing permissions and
 + *  limitations under the License.
 + */
 +package org.apache.hadoop.hdds.scm.cli.pipeline;
 +
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 +import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.hdds.scm.client.ScmClient;
 +import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 +import org.junit.After;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.mockito.Mockito;
 +import picocli.CommandLine;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.io.PrintStream;
 +import java.nio.charset.StandardCharsets;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.UUID;
 +
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 +import static org.mockito.Mockito.mock;
 +
 +/**
 + * Tests for the ListPipelineSubCommand class.
 + */
 +public class TestListPipelinesSubCommand {
 +
 +  private ListPipelinesSubcommand cmd;
 +  private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
 +  private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
 +  private final PrintStream originalOut = System.out;
 +  private final PrintStream originalErr = System.err;
 +  private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name();
 +  private ScmClient scmClient;
 +
 +  @Before
 +  public void setup() throws IOException {
 +    cmd = new ListPipelinesSubcommand();
 +    System.setOut(new PrintStream(outContent, false, DEFAULT_ENCODING));
 +    System.setErr(new PrintStream(errContent, false, DEFAULT_ENCODING));
 +
 +    scmClient = mock(ScmClient.class);
 +    Mockito.when(scmClient.listPipelines())
 +        .thenAnswer(invocation -> createPipelines());
 +  }
 +
 +  @After
 +  public void tearDown() {
 +    System.setOut(originalOut);
 +    System.setErr(originalErr);
 +  }
 +
 +  @Test
 +  public void testAllPipelinesReturnedWithNoFilter() throws IOException {
 +    CommandLine c = new CommandLine(cmd);
 +    c.parseArgs();
 +    cmd.execute(scmClient);
 +    Assert.assertEquals(6, outContent.toString(DEFAULT_ENCODING)
 +        .split(System.getProperty("line.separator")).length);
 +  }
 +
 +  @Test
 +  public void testOnlyOpenReturned() throws IOException {
 +    CommandLine c = new CommandLine(cmd);
 +    c.parseArgs("-s", "OPEN");
 +    cmd.execute(scmClient);
 +    String output = outContent.toString(DEFAULT_ENCODING);
 +    Assert.assertEquals(3, output.split(
 +        System.getProperty("line.separator")).length);
 +    Assert.assertEquals(-1, output.indexOf("CLOSED"));
 +  }
 +
 +  @Test(expected = IOException.class)
 +  public void testExceptionIfReplicationWithoutType() throws IOException {
 +    CommandLine c = new CommandLine(cmd);
 +    c.parseArgs("-r", "THREE");
 +    cmd.execute(scmClient);
 +  }
 +
 +  @Test
 +  public void testReplicationAndType() throws IOException {
 +    CommandLine c = new CommandLine(cmd);
 +    c.parseArgs("-r", "THREE", "-t", "RATIS");
 +    cmd.execute(scmClient);
 +
 +    String output = outContent.toString(DEFAULT_ENCODING);
 +    Assert.assertEquals(2, output.split(
 +        System.getProperty("line.separator")).length);
 +    Assert.assertEquals(-1, output.indexOf("EC"));
 +  }
 +
 +  @Test
 +  public void testReplicationAndTypeEC() throws IOException {
 +    CommandLine c = new CommandLine(cmd);
 +    c.parseArgs("-r", "rs-6-3-1024k", "-t", "EC");
 +    cmd.execute(scmClient);
 +
 +    String output = outContent.toString(DEFAULT_ENCODING);
 +    Assert.assertEquals(1, output.split(
 +        System.getProperty("line.separator")).length);
 +    Assert.assertEquals(-1,
 +        output.indexOf("ReplicationConfig: RATIS"));
 +  }
 +
 +  @Test
 +  public void testReplicationAndTypeAndState() throws IOException {
 +    CommandLine c = new CommandLine(cmd);
 +    c.parseArgs("-r", "THREE", "-t", "RATIS", "-s", "OPEN");
 +    cmd.execute(scmClient);
 +
 +    String output = outContent.toString(DEFAULT_ENCODING);
 +    Assert.assertEquals(1, output.split(
 +        System.getProperty("line.separator")).length);
 +    Assert.assertEquals(-1, output.indexOf("CLOSED"));
 +    Assert.assertEquals(-1, output.indexOf("EC"));
 +  }
 +
 +  private List<Pipeline> createPipelines() {
 +    List<Pipeline> pipelines = new ArrayList<>();
-     pipelines.add(createPipeline(
-         new StandaloneReplicationConfig(ONE), Pipeline.PipelineState.OPEN));
-     pipelines.add(createPipeline(
-         new RatisReplicationConfig(THREE), Pipeline.PipelineState.OPEN));
-     pipelines.add(createPipeline(
-         new RatisReplicationConfig(THREE), Pipeline.PipelineState.CLOSED));
++    pipelines.add(createPipeline(StandaloneReplicationConfig.getInstance(ONE),
++        Pipeline.PipelineState.OPEN));
++    pipelines.add(createPipeline(RatisReplicationConfig.getInstance(THREE),
++        Pipeline.PipelineState.OPEN));
++    pipelines.add(createPipeline(RatisReplicationConfig.getInstance(THREE),
++        Pipeline.PipelineState.CLOSED));
 +
 +    pipelines.add(createPipeline(
 +        new ECReplicationConfig(3, 2), Pipeline.PipelineState.OPEN));
 +    pipelines.add(createPipeline(
 +        new ECReplicationConfig(3, 2), Pipeline.PipelineState.CLOSED));
 +    pipelines.add(createPipeline(
 +        new ECReplicationConfig(6, 3), Pipeline.PipelineState.CLOSED));
 +
 +    return pipelines;
 +  }
 +
 +  private Pipeline createPipeline(ReplicationConfig repConfig,
 +      Pipeline.PipelineState state) {
 +    return new Pipeline.Builder()
 +        .setId(PipelineID.randomId())
 +        .setCreateTimestamp(System.currentTimeMillis())
 +        .setState(state)
 +        .setReplicationConfig(repConfig)
 +        .setNodes(createDatanodeDetails(1))
 +        .build();
 +  }
 +
 +  private List<DatanodeDetails> createDatanodeDetails(int count) {
 +    List<DatanodeDetails> dns = new ArrayList<>();
 +    for (int i = 0; i < count; i++) {
 +      HddsProtos.DatanodeDetailsProto dnd =
 +          HddsProtos.DatanodeDetailsProto.newBuilder()
 +              .setHostName("host" + i)
 +              .setIpAddress("1.2.3." + i + 1)
 +              .setNetworkLocation("/default")
 +              .setNetworkName("host" + i)
 +              .addPorts(HddsProtos.Port.newBuilder()
 +                  .setName("ratis").setValue(5678).build())
 +              .setUuid(UUID.randomUUID().toString())
 +              .build();
 +      dns.add(DatanodeDetails.getFromProtoBuf(dnd));
 +    }
 +    return dns;
 +  }
 +}
 +
 +
diff --cc hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 405182f,0000000..b0e9755
mode 100644,000000..100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@@ -1,419 -1,0 +1,420 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.ozone.client.io;
 +
 +import com.google.common.base.Preconditions;
 +import org.apache.hadoop.fs.FSExceptionMessages;
 +import org.apache.hadoop.hdds.client.BlockID;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 +import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 +import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 +import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 +import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
 +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.EOFException;
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.function.Function;
 +
 +/**
 + * Class to read data from an EC Block Group.
 + */
 +public class ECBlockInputStream extends BlockExtendedInputStream {
 +
 +  private static final Logger LOG =
 +      LoggerFactory.getLogger(ECBlockInputStream.class);
 +
 +  private final ECReplicationConfig repConfig;
 +  private final int ecChunkSize;
 +  private final long stripeSize;
 +  private final BlockInputStreamFactory streamFactory;
 +  private final boolean verifyChecksum;
 +  private final XceiverClientFactory xceiverClientFactory;
 +  private final Function<BlockID, Pipeline> refreshFunction;
 +  private final OmKeyLocationInfo blockInfo;
 +  private final DatanodeDetails[] dataLocations;
 +  private final BlockExtendedInputStream[] blockStreams;
 +  private final int maxLocations;
 +
 +  private long position = 0;
 +  private boolean closed = false;
 +  private boolean seeked = false;
 +
 +  protected OmKeyLocationInfo getBlockInfo() {
 +    return blockInfo;
 +  }
 +
 +  protected ECReplicationConfig getRepConfig() {
 +    return repConfig;
 +  }
 +
 +  protected DatanodeDetails[] getDataLocations() {
 +    return dataLocations;
 +  }
 +
 +  protected long getStripeSize() {
 +    return stripeSize;
 +  }
 +
 +  /**
 +   * Returns the number of available data locations, taking account of the
 +   * expected number of locations. Eg, if the block is less than 1 EC chunk,
 +   * we only expect 1 data location. If it is between 1 and 2 chunks, we expect
 +   * there to be 2 locations, and so on.
 +   * @param expectedLocations The maximum number of allowed data locations,
 +   *                          depending on the block size.
 +   * @return The number of available data locations.
 +   */
 +  protected int availableDataLocations(int expectedLocations) {
 +    int count = 0;
 +    for (int i = 0; i < repConfig.getData() && i < expectedLocations; i++) {
 +      if (dataLocations[i] != null) {
 +        count++;
 +      }
 +    }
 +    return count;
 +  }
 +
 +  protected int availableParityLocations() {
 +    int count = 0;
 +    for (int i = repConfig.getData();
 +         i < repConfig.getData() + repConfig.getParity(); i++) {
 +      if (dataLocations[i] != null) {
 +        count++;
 +      }
 +    }
 +    return count;
 +  }
 +
 +  public ECBlockInputStream(ECReplicationConfig repConfig,
 +      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
 +      XceiverClientFactory xceiverClientFactory, Function<BlockID,
 +      Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
 +    this.repConfig = repConfig;
 +    this.ecChunkSize = repConfig.getEcChunkSize();
 +    this.verifyChecksum = verifyChecksum;
 +    this.blockInfo = blockInfo;
 +    this.streamFactory = streamFactory;
 +    this.xceiverClientFactory = xceiverClientFactory;
 +    this.refreshFunction = refreshFunction;
 +    this.maxLocations = repConfig.getData() + repConfig.getParity();
 +    this.dataLocations = new DatanodeDetails[repConfig.getRequiredNodes()];
 +    this.blockStreams =
 +        new BlockExtendedInputStream[repConfig.getRequiredNodes()];
 +
 +    this.stripeSize = (long)ecChunkSize * repConfig.getData();
 +    setBlockLocations(this.blockInfo.getPipeline());
 +  }
 +
 +  public synchronized boolean hasSufficientLocations() {
 +    // The number of locations needed is a function of the EC Chunk size. If the
 +    // block length is <= the chunk size, we should only have location 1. If it
 +    // is greater than the chunk size but less than chunk_size * 2, then we must
 +    // have two locations. If it is greater than chunk_size * data_num, then we
 +    // must have all data_num locations.
 +    // We only consider data locations here.
 +    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
 +    return expectedDataBlocks == availableDataLocations(expectedDataBlocks);
 +  }
 +
 +  protected int calculateExpectedDataBlocks(ECReplicationConfig rConfig) {
 +    return ECBlockInputStreamProxy.expectedDataLocations(rConfig, getLength());
 +  }
 +
 +  /**
 +   * Using the current position, returns the index of the blockStream we should
 +   * be reading from. This is the index in the internal array holding the
 +   * stream reference. The block group index will be one greater than this.
 +   * @return
 +   */
 +  protected int currentStreamIndex() {
 +    return (int)((position / ecChunkSize) % repConfig.getData());
 +  }
 +
 +  /**
 +   * Uses the current position and ecChunkSize to determine which of the
 +   * internal block streams the next read should come from. Also opens the
 +   * stream if it has not been opened already.
 +   * @return BlockInput stream to read from.
 +   */
 +  protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
 +    BlockExtendedInputStream stream = blockStreams[locationIndex];
 +    if (stream == null) {
 +      // To read an EC block, we create a STANDALONE pipeline that contains the
 +      // single location for the block index we want to read. The EC blocks are
 +      // indexed from 1 to N, however the data locations are stored in the
 +      // dataLocations array indexed from zero.
 +      Pipeline pipeline = Pipeline.newBuilder()
-           .setReplicationConfig(new StandaloneReplicationConfig(
++          .setReplicationConfig(StandaloneReplicationConfig.getInstance(
 +              HddsProtos.ReplicationFactor.ONE))
 +          .setNodes(Arrays.asList(dataLocations[locationIndex]))
 +          .setId(PipelineID.randomId())
 +          .setState(Pipeline.PipelineState.CLOSED)
 +          .build();
 +
 +      OmKeyLocationInfo blkInfo = new OmKeyLocationInfo.Builder()
 +          .setBlockID(blockInfo.getBlockID())
 +          .setLength(internalBlockLength(locationIndex + 1))
 +          .setPipeline(blockInfo.getPipeline())
 +          .setToken(blockInfo.getToken())
 +          .setPartNumber(blockInfo.getPartNumber())
 +          .build();
 +      stream = streamFactory.create(
-           new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE),
++          StandaloneReplicationConfig.getInstance(
++              HddsProtos.ReplicationFactor.ONE),
 +          blkInfo, pipeline,
 +          blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
 +          refreshFunction);
 +      blockStreams[locationIndex] = stream;
 +    }
 +    return stream;
 +  }
 +
 +  /**
 +   * Returns the length of the Nth block in the block group, taking account of a
 +   * potentially partial last stripe. Note that the internal block index is
 +   * numbered starting from 1.
 +   * @param index - Index number of the internal block, starting from 1
 +   * @return
 +   */
 +  protected long internalBlockLength(int index) {
 +    long lastStripe = blockInfo.getLength() % stripeSize;
 +    long blockSize = (blockInfo.getLength() - lastStripe) / repConfig.getData();
 +    long lastCell = lastStripe / ecChunkSize + 1;
 +    long lastCellLength = lastStripe % ecChunkSize;
 +
 +    if (index > repConfig.getData()) {
 +      // Its a parity block and their size is driven by the size of the
 +      // first block of the block group. All parity blocks have the same size
 +      // as block_1.
 +      index = 1;
 +    }
 +
 +    if (index < lastCell) {
 +      return blockSize + ecChunkSize;
 +    } else if (index == lastCell) {
 +      return blockSize + lastCellLength;
 +    } else {
 +      return blockSize;
 +    }
 +  }
 +
 +  private void setBlockLocations(Pipeline pipeline) {
 +    for (DatanodeDetails node : pipeline.getNodes()) {
 +      int index = pipeline.getReplicaIndex(node);
 +      addBlockLocation(index, node);
 +    }
 +  }
 +
 +  private void addBlockLocation(int index, DatanodeDetails location) {
 +    if (index > maxLocations) {
 +      throw new IndexOutOfBoundsException("The index " + index + " is greater "
 +          + "than the EC Replication Config (" + repConfig + ")");
 +    }
 +    dataLocations[index - 1] = location;
 +  }
 +
 +  protected long blockLength() {
 +    return blockInfo.getLength();
 +  }
 +
 +  protected long remaining() {
 +    return blockLength() - position;
 +  }
 +
 +  /**
 +   * Read from the internal BlockInputStreams one EC cell at a time into the
 +   * strategy buffer. This call may read from several internal BlockInputStreams
 +   * if there is sufficient space in the buffer.
 +   * @param strategy
 +   * @return
 +   * @throws IOException
 +   */
 +  @Override
 +  protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
 +      throws IOException {
 +    Preconditions.checkArgument(strategy != null);
 +    checkOpen();
 +
 +    if (remaining() == 0) {
 +      return EOF;
 +    }
 +
 +    int totalRead = 0;
 +    while (strategy.getTargetLength() > 0 && remaining() > 0) {
 +      try {
 +        int currentIndex = currentStreamIndex();
 +        BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
 +        int read = readFromStream(stream, strategy);
 +        totalRead += read;
 +        position += read;
 +      } catch (IOException ioe) {
 +        throw new BadDataLocationException(
 +            dataLocations[currentStreamIndex()], ioe);
 +      }
 +    }
 +    return totalRead;
 +  }
 +
 +  @Override
 +  public synchronized long getRemaining() {
 +    return blockInfo.getLength() - position;
 +  }
 +
 +  @Override
 +  public synchronized long getLength() {
 +    return blockInfo.getLength();
 +  }
 +
 +  @Override
 +  public BlockID getBlockID() {
 +    return blockInfo.getBlockID();
 +  }
 +
 +  protected void seekStreamIfNecessary(BlockExtendedInputStream stream,
 +      long partialChunkSize) throws IOException {
 +    if (seeked) {
 +      // Seek on the underlying streams is performed lazily, as there is a
 +      // possibility a read after a seek may only read a small amount of data.
 +      // Once this block stream has been seeked, we always check the position,
 +      // but in the usual case, where there are no seeks at all, we don't need
 +      // to do this extra work.
 +      long basePosition = (position / stripeSize) * (long)ecChunkSize;
 +      long streamPosition = basePosition + partialChunkSize;
 +      if (streamPosition != stream.getPos()) {
 +        // This ECBlockInputStream has been seeked, so the underlying
 +        // block stream is no longer at the correct position. Therefore we need
 +        // to seek it too.
 +        stream.seek(streamPosition);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Read the most allowable amount of data from the current stream. This
 +   * ensures we don't read past the end of an EC cell or the overall block
 +   * group length.
 +   * @param stream Stream to read from
 +   * @param strategy The ReaderStrategy to read data into
 +   * @return
 +   * @throws IOException
 +   */
 +  private int readFromStream(BlockExtendedInputStream stream,
 +      ByteReaderStrategy strategy)
 +      throws IOException {
 +    long partialPosition = position % ecChunkSize;
 +    seekStreamIfNecessary(stream, partialPosition);
 +    long ecLimit = ecChunkSize - partialPosition;
 +    // Free space in the buffer to read into
 +    long bufLimit = strategy.getTargetLength();
 +    // How much we can read, the lower of the EC Cell, buffer and overall block
 +    // remaining.
 +    int expectedRead = (int)Math.min(Math.min(ecLimit, bufLimit), remaining());
 +    int actualRead = strategy.readFromBlock(stream, expectedRead);
 +    if (actualRead == -1) {
 +      // The Block Stream reached EOF, but we did not expect it to, so the block
 +      // might be corrupt.
 +      throw new IOException("Expected to read " + expectedRead + " but got EOF"
 +          + " from blockGroup " + stream.getBlockID() + " index "
 +          + currentStreamIndex() + 1);
 +    }
 +    return actualRead;
 +  }
 +
 +  /**
 +   * Verify that the input stream is open.
 +   * @throws IOException if the connection is closed.
 +   */
 +  private void checkOpen() throws IOException {
 +    if (closed) {
 +      throw new IOException(
 +          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Block: "
 +              + blockInfo.getBlockID());
 +    }
 +  }
 +
 +  @Override
 +  public synchronized void close() {
 +    closeStreams();
 +    closed = true;
 +  }
 +
 +  protected synchronized void closeStreams() {
 +    for (int i = 0; i < blockStreams.length; i++) {
 +      if (blockStreams[i] != null) {
 +        try {
 +          blockStreams[i].close();
 +          blockStreams[i] = null;
 +        } catch (IOException e) {
 +          LOG.error("Failed to close stream {}", blockStreams[i], e);
 +        }
 +      }
 +    }
 +    // If the streams have been closed outside of a close() call, then it may
 +    // be due to freeing resources. If they are reopened, then we will need to
 +    // seek the stream to its expected position when the next read is attempted.
 +    seeked = true;
 +  }
 +
 +  @Override
 +  public synchronized void unbuffer() {
 +    for (BlockExtendedInputStream stream : blockStreams) {
 +      if (stream != null) {
 +        stream.unbuffer();
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public synchronized void seek(long pos) throws IOException {
 +    checkOpen();
 +    if (pos < 0 || pos >= getLength()) {
 +      if (pos == 0) {
 +        // It is possible for length and pos to be zero in which case
 +        // seek should return instead of throwing exception
 +        return;
 +      }
 +      throw new EOFException(
 +          "EOF encountered at pos: " + pos + " for block: "
 +              + blockInfo.getBlockID());
 +    }
 +    position = pos;
 +    seeked = true;
 +  }
 +
 +  @Override
 +  public synchronized long getPos() {
 +    return position;
 +  }
 +
 +  protected synchronized void setPos(long pos) {
 +    position = pos;
 +  }
 +
 +  @Override
 +  public synchronized boolean seekToNewSource(long l) throws IOException {
 +    return false;
 +  }
- }
++}
diff --cc hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
index a742ab9,650fc91..aa64fd2
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
@@@ -115,52 -111,4 +115,56 @@@ public class TestOmBucketInfo 
      Assert.assertEquals((int) 1, cloneBucketInfo.getAcls().size());
  
    }
 +
 +  @Test
 +  public void getProtobufMessageEC() {
 +    OmBucketInfo omBucketInfo =
 +        OmBucketInfo.newBuilder().setBucketName("bucket").setVolumeName("vol1")
 +            .setCreationTime(Time.now()).setIsVersionEnabled(false)
 +            .setStorageType(StorageType.ARCHIVE).setAcls(Collections
 +            .singletonList(new OzoneAcl(IAccessAuthorizer.ACLIdentityType.USER,
 +                "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
 +                OzoneAcl.AclScope.ACCESS))).build();
 +    OzoneManagerProtocolProtos.BucketInfo protobuf = omBucketInfo.getProtobuf();
 +    // No EC Config
 +    Assert.assertFalse(protobuf.hasDefaultReplicationConfig());
 +
 +    // Reconstruct object from Proto
 +    OmBucketInfo recovered = OmBucketInfo.getFromProtobuf(protobuf);
 +    Assert.assertNull(recovered.getDefaultReplicationConfig());
 +
 +    // EC Config
-     omBucketInfo =
-         OmBucketInfo.newBuilder().setBucketName("bucket").setVolumeName("vol1")
-             .setCreationTime(Time.now()).setIsVersionEnabled(false)
-             .setStorageType(StorageType.ARCHIVE).setAcls(Collections
-             .singletonList(new OzoneAcl(IAccessAuthorizer.ACLIdentityType.USER,
-                 "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
-                 OzoneAcl.AclScope.ACCESS))).setDefaultReplicationConfig(
++    omBucketInfo = OmBucketInfo.newBuilder()
++        .setBucketName("bucket")
++        .setVolumeName("vol1")
++        .setCreationTime(Time.now())
++        .setIsVersionEnabled(false)
++        .setStorageType(StorageType.ARCHIVE)
++        .setAcls(Collections.singletonList(new OzoneAcl(
++            IAccessAuthorizer.ACLIdentityType.USER,
++            "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
++            OzoneAcl.AclScope.ACCESS)))
++        .setDefaultReplicationConfig(
 +            new DefaultReplicationConfig(ReplicationType.EC,
 +                new ECReplicationConfig(3, 2))).build();
 +    protobuf = omBucketInfo.getProtobuf();
 +
 +    Assert.assertTrue(protobuf.hasDefaultReplicationConfig());
 +    Assert.assertEquals(3,
 +        protobuf.getDefaultReplicationConfig().getEcReplicationConfig()
 +            .getData());
 +    Assert.assertEquals(2,
 +        protobuf.getDefaultReplicationConfig().getEcReplicationConfig()
 +            .getParity());
 +
 +    // Reconstruct object from Proto
 +    recovered = OmBucketInfo.getFromProtobuf(protobuf);
 +    Assert.assertEquals(ReplicationType.EC,
 +        recovered.getDefaultReplicationConfig().getType());
 +    ECReplicationConfig config =
 +        recovered.getDefaultReplicationConfig().getEcReplicationConfig();
 +    Assert.assertNotNull(config);
 +    Assert.assertEquals(3, config.getData());
 +    Assert.assertEquals(2, config.getParity());
 +  }
  }
diff --cc hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
index 4440ce0,3b04ed4..a6a0ec9
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
@@@ -25,9 -23,9 +25,10 @@@ import org.apache.hadoop.hdds.client.St
  import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
  import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
  import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+ import org.apache.hadoop.ozone.ClientVersion;
  import org.apache.hadoop.ozone.OzoneAcl;
  import org.apache.hadoop.ozone.om.helpers.OmKeyInfo.Builder;
 +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
  import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
  import org.apache.hadoop.util.Time;
  import org.junit.Assert;
@@@ -38,10 -36,6 +39,9 @@@ import java.util.Arrays
  import java.util.Collections;
  import java.util.List;
  
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
- import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
  import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
  
  /**
@@@ -51,57 -45,7 +51,57 @@@ public class TestOmKeyInfo 
  
    @Test
    public void protobufConversion() {
-     OmKeyInfo key =
-         createOmKeyInfo(new RatisReplicationConfig(ReplicationFactor.THREE));
 -    OmKeyInfo key = new Builder()
++    OmKeyInfo key = createOmKeyInfo(
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
 +
-     OmKeyInfo keyAfterSerialization =
-         OmKeyInfo.getFromProtobuf(key.getProtobuf(CURRENT_VERSION));
++    OmKeyInfo keyAfterSerialization = OmKeyInfo.getFromProtobuf(
++        key.getProtobuf(ClientVersion.CURRENT_VERSION));
 +
 +    Assert.assertEquals(key, keyAfterSerialization);
 +  }
 +
 +  @Test
 +  public void getProtobufMessageEC() {
-     OmKeyInfo key =
-         createOmKeyInfo(new RatisReplicationConfig(ReplicationFactor.THREE));
++    OmKeyInfo key = createOmKeyInfo(
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
 +    OzoneManagerProtocolProtos.KeyInfo omKeyProto =
-         key.getProtobuf(CURRENT_VERSION);
++        key.getProtobuf(ClientVersion.CURRENT_VERSION);
 +
 +    // No EC Config
 +    Assert.assertFalse(omKeyProto.hasEcReplicationConfig());
 +    Assert.assertEquals(THREE, omKeyProto.getFactor());
 +    Assert.assertEquals(RATIS, omKeyProto.getType());
 +
 +    // Reconstruct object from Proto
 +    OmKeyInfo recovered = OmKeyInfo.getFromProtobuf(omKeyProto);
 +    Assert.assertEquals(RATIS,
 +        recovered.getReplicationConfig().getReplicationType());
 +    Assert.assertTrue(
 +        recovered.getReplicationConfig() instanceof RatisReplicationConfig);
 +
 +    // EC Config
 +    key = createOmKeyInfo(new ECReplicationConfig(3, 2));
-     omKeyProto = key.getProtobuf(CURRENT_VERSION);
++    omKeyProto = key.getProtobuf(ClientVersion.CURRENT_VERSION);
 +
 +    Assert.assertEquals(3, omKeyProto.getEcReplicationConfig().getData());
 +    Assert.assertEquals(2, omKeyProto.getEcReplicationConfig().getParity());
 +    Assert.assertFalse(omKeyProto.hasFactor());
 +    Assert.assertEquals(EC, omKeyProto.getType());
 +
 +    // Reconstruct object from Proto
 +    recovered = OmKeyInfo.getFromProtobuf(omKeyProto);
 +    Assert.assertEquals(EC,
 +        recovered.getReplicationConfig().getReplicationType());
 +    Assert.assertTrue(
 +        recovered.getReplicationConfig() instanceof ECReplicationConfig);
 +    ECReplicationConfig config =
 +        (ECReplicationConfig) recovered.getReplicationConfig();
 +    Assert.assertEquals(3, config.getData());
 +    Assert.assertEquals(2, config.getParity());
 +  }
 +
 +  private OmKeyInfo createOmKeyInfo(ReplicationConfig replicationConfig) {
 +    return new Builder()
          .setKeyName("key1")
          .setBucketName("bucket")
          .setVolumeName("vol1")
@@@ -211,4 -161,4 +211,4 @@@
          .setPipeline(pipeline)
          .build();
    }
--}
++}
diff --cc hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java
index 59aeece,ee850bc..c6f8818
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java
@@@ -42,90 -34,22 +42,91 @@@ import static org.junit.Assert.assertNo
  public class TestOmMultipartKeyInfo {
  
    @Test
 -  public void testCopyObject() {
 -    OmMultipartKeyInfo omMultipartKeyInfo = new OmMultipartKeyInfo.Builder()
 -        .setUploadID(UUID.randomUUID().toString())
 -        .setCreationTime(Time.now())
 -        .setReplicationConfig(RatisReplicationConfig
 -            .getInstance(HddsProtos.ReplicationFactor.THREE))
 +  public void copyObject() {
 +    for (ReplicationConfig param : replicationConfigs().collect(toList())) {
 +      testCopyObject(param);
 +    }
 +  }
 +
 +  //@ParameterizedTest
 +  //@MethodSource("replicationConfigs")
 +  private void testCopyObject(ReplicationConfig replicationConfig) {
 +    // GIVEN
 +    OmMultipartKeyInfo subject = createSubject()
 +        .setReplicationConfig(replicationConfig)
 +        .build();
 +
 +    // WHEN
 +    OmMultipartKeyInfo copy = subject.copyObject();
 +
 +    // THEN
 +    assertNotSame(subject, copy);
 +    assertEquals(subject, copy);
 +    assertEquals(replicationConfig, copy.getReplicationConfig());
 +  }
 +
 +  @Test
 +  public void protoConversion() {
 +    for (ReplicationConfig param : replicationConfigs().collect(toList())) {
 +      protoConversion(param);
 +    }
 +  }
 +
 +  //@ParameterizedTest
 +  //@MethodSource("replicationConfigs")
 +  private void protoConversion(ReplicationConfig replicationConfig) {
 +    // GIVEN
 +    OmMultipartKeyInfo subject = createSubject()
 +        .setReplicationConfig(replicationConfig)
          .build();
  
 -    OmMultipartKeyInfo cloneMultipartKeyInfo = omMultipartKeyInfo.copyObject();
 +    // WHEN
 +    OzoneManagerProtocolProtos.MultipartKeyInfo proto = subject.getProto();
 +    OmMultipartKeyInfo fromProto = OmMultipartKeyInfo.getFromProto(proto);
 +
 +    // THEN
 +    assertEquals(subject, fromProto);
 +    assertEquals(replicationConfig, fromProto.getReplicationConfig());
 +  }
 +
 +  private static Stream<ReplicationConfig> replicationConfigs() {
 +    return Stream.of(
-         new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE),
-         new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE),
++        StandaloneReplicationConfig.getInstance(
++            HddsProtos.ReplicationFactor.ONE),
++        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
 +        new ECReplicationConfig(3, 2)
 +    );
 +  }
 +
 +  @Test
 +  public void distinctListOfParts() {
 +    // GIVEN
 +    OmMultipartKeyInfo subject = createSubject().build();
 +    OmMultipartKeyInfo copy = subject.copyObject();
 +
 +    // WHEN
 +    subject.addPartKeyInfo(1, createPart(createKeyInfo()).build());
 +
 +    // THEN
 +    assertEquals(0, copy.getPartKeyInfoMap().size());
 +    assertEquals(1, subject.getPartKeyInfoMap().size());
 +  }
  
 -    Assert.assertEquals(cloneMultipartKeyInfo, omMultipartKeyInfo);
 +  private static OmMultipartKeyInfo.Builder createSubject() {
 +    return new OmMultipartKeyInfo.Builder()
 +        .setUploadID(UUID.randomUUID().toString())
 +        .setCreationTime(Time.now());
 +  }
  
 -    // Just setting dummy values for this test.
 -    omMultipartKeyInfo.addPartKeyInfo(1,
 -        PartKeyInfo.newBuilder().setPartNumber(1).setPartName("/path")
 -            .setPartKeyInfo(KeyInfo.newBuilder()
 +  private static PartKeyInfo.Builder createPart(KeyInfo.Builder partKeyInfo) {
 +    return PartKeyInfo.newBuilder()
 +        .setPartNumber(1)
 +        .setPartName("/path")
 +        .setPartKeyInfo(partKeyInfo);
 +  }
 +
 +  private static KeyInfo.Builder createKeyInfo() {
 +    return KeyInfo.newBuilder()
          .setVolumeName(UUID.randomUUID().toString())
          .setBucketName(UUID.randomUUID().toString())
          .setKeyName(UUID.randomUUID().toString())
diff --cc hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestQuotaUtil.java
index d06932a,0000000..558676d
mode 100644,000000..100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestQuotaUtil.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestQuotaUtil.java
@@@ -1,98 -1,0 +1,99 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.ozone.om.helpers;
 +
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
++import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import static org.apache.hadoop.hdds.client.ECReplicationConfig.EcCodec.RS;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 +
 +/**
 + * Tests for the QuotaUtil class.
 + */
 +public class TestQuotaUtil {
 +
 +  private static final int ONE_MB = 1024 * 1024;
 +
 +  @Test
 +  public void testRatisThreeReplication() {
-     RatisReplicationConfig repConfig = new RatisReplicationConfig(THREE);
++    ReplicationConfig repConfig = RatisReplicationConfig.getInstance(THREE);
 +    long replicatedSize =
 +        QuotaUtil.getReplicatedSize(123 * ONE_MB, repConfig);
 +    Assert.assertEquals(123 * ONE_MB * 3, replicatedSize);
 +  }
 +
 +  @Test
 +  public void testRatisOneReplication() {
-     RatisReplicationConfig repConfig = new RatisReplicationConfig(ONE);
++    ReplicationConfig repConfig = RatisReplicationConfig.getInstance(ONE);
 +    long replicatedSize =
 +        QuotaUtil.getReplicatedSize(123 * ONE_MB, repConfig);
 +    Assert.assertEquals(123 * ONE_MB, replicatedSize);
 +  }
 +
 +  @Test
 +  public void testECFullStripeReplication() {
 +    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
 +    long dataSize = ONE_MB * 3 * 123; // 123 full stripe
 +    long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
 +    Assert.assertEquals(dataSize + 123 * ONE_MB * 2, replicatedSize);
 +  }
 +
 +  @Test
 +  public void testECPartialStripeIntoFirstChunk() {
 +    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
 +    long dataSize = ONE_MB * 3 * 123 + 10; // 123 full stripes, plus 10 bytes
 +    long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
 +    // Expected is 123 parity stripes, plus another 10 bytes in each parity
 +    Assert.assertEquals(dataSize + 123 * ONE_MB * 2 + 10 * 2, replicatedSize);
 +  }
 +
 +  @Test
 +  public void testECPartialStripeBeyondFirstChunk() {
 +    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
 +    // 123 full stripes, plus 1MB+10 bytes
 +    long dataSize = ONE_MB * 3 * 123 + ONE_MB + 10;
 +    long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
 +    // Expected is 123 parity stripes, plus another 1MB in each parity
 +    Assert.assertEquals(
 +        dataSize + 123 * ONE_MB * 2 + ONE_MB * 2, replicatedSize);
 +  }
 +
 +  @Test
 +  public void testECPartialSingleStripeFirstChunk() {
 +    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
 +    long dataSize = 10;
 +    long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
 +    // Expected is 123 parity stripes, plus another 1MB in each parity
 +    Assert.assertEquals(dataSize + 10 * 2, replicatedSize);
 +  }
 +
 +  @Test
 +  public void testECPartialSingleBeyondFirstChunk() {
 +    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
 +    long dataSize = 2 * ONE_MB + 10;
 +    long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
 +    // Expected is 123 parity stripes, plus another 1MB in each parity
 +    Assert.assertEquals(dataSize + ONE_MB * 2, replicatedSize);
 +  }
 +
 +}
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index fea70ac,bf24ae9..4c4bdc6
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@@ -1196,66 -1188,6 +1196,66 @@@ public class TestOzoneFileSystem 
    }
  
    @Test
 +  public void testCreateKeyShouldUseRefreshedBucketReplicationConfig()
 +      throws IOException {
 +    OzoneBucket bucket =
 +        TestDataUtil.createVolumeAndBucket(cluster, bucketLayout);
 +    final TestClock testClock = new TestClock(Instant.now(), ZoneOffset.UTC);
 +
 +    String rootPath = String
 +        .format("%s://%s.%s/", OzoneConsts.OZONE_URI_SCHEME, bucket.getName(),
 +            bucket.getVolumeName());
 +
 +    // Set the fs.defaultFS and start the filesystem
 +    Configuration conf = new OzoneConfiguration(cluster.getConf());
 +    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
 +    // Set the number of keys to be processed during batch operate.
 +    OzoneFileSystem o3FS = (OzoneFileSystem) FileSystem.get(conf);
 +
 +    //Let's reset the clock to control the time.
 +    ((BasicOzoneClientAdapterImpl) (o3FS.getAdapter())).setClock(testClock);
 +
 +    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key"),
 +        ReplicationType.RATIS);
 +
 +    bucket.setReplicationConfig(new ECReplicationConfig("rs-3-2-1024k"));
 +
 +    //After changing the bucket policy, it should create ec key, but o3fs will
 +    // refresh after some time. So, it will be sill old type.
 +    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key1"),
 +        ReplicationType.RATIS);
 +
 +    testClock.fastForward(300 * 1000 + 1);
 +
 +    //After client bucket refresh time, it should create new type what is
 +    // available on bucket at that moment.
 +    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key2"),
 +        ReplicationType.EC);
 +
 +    // Rechecking the same steps with changing to Ratis again to check the
 +    // behavior is consistent.
 +    bucket.setReplicationConfig(
-         new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE));
++        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
 +
 +    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key3"),
 +        ReplicationType.EC);
 +
 +    testClock.fastForward(300 * 1000 + 1);
 +
 +    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key4"),
 +        ReplicationType.RATIS);
 +  }
 +
 +  private void createKeyAndAssertKeyType(OzoneBucket bucket,
 +      OzoneFileSystem o3FS, Path keyPath, ReplicationType expectedType)
 +      throws IOException {
 +    o3FS.createFile(keyPath).build().close();
 +    Assert.assertEquals(expectedType.name(),
 +        bucket.getKey(o3FS.pathToKey(keyPath)).getReplicationConfig()
 +            .getReplicationType().name());
 +  }
 +
 +  @Test
    public void testGetTrashRoots() throws IOException {
      String username = UserGroupInformation.getCurrentUser().getShortUserName();
      Path trashRoot = new Path(OZONE_URI_DELIMITER, TRASH_PREFIX);
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 13802a1,ef561e6..b239621
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@@ -68,15 -67,10 +68,15 @@@ public class TestOzoneConfigurationFiel
          ".duration"); // Deprecated config
      configurationPropsToSkipCompare
          .add(ScmConfig.ConfigStrings.HDDS_SCM_INIT_DEFAULT_LAYOUT_VERSION);
 +    // Currently replication and type configs moved to server side.
 +    configurationPropsToSkipCompare
 +        .add(OzoneConfigKeys.OZONE_REPLICATION);
 +    configurationPropsToSkipCompare
 +        .add(OzoneConfigKeys.OZONE_REPLICATION_TYPE);
      configurationPropsToSkipCompare
-         .add(OzoneConfigKeys.OZONE_OM_CLIENT_PROTOCOL_VERSION_KEY);
+         .add(OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY);
      configurationPropsToSkipCompare
-         .add(OzoneConfigKeys.OZONE_OM_CLIENT_PROTOCOL_VERSION);
+         .add(OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT);
      // This property is tested in TestHttpServer2 instead
      xmlPropsToSkipCompare.add(HttpServer2.HTTP_IDLE_TIMEOUT_MS_KEY);
      addPropertiesNotInXml();
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 610a34c,0000000..4f7881f
mode 100644,000000..100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@@ -1,371 -1,0 +1,371 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership.  The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +package org.apache.hadoop.ozone.client.rpc;
 +
 +import org.apache.hadoop.conf.StorageUnit;
 +import org.apache.hadoop.hdds.HddsConfigKeys;
 +import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationType;
 +import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 +import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 +import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
 +import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 +import org.apache.hadoop.ozone.MiniOzoneCluster;
 +import org.apache.hadoop.ozone.OzoneConfigKeys;
 +import org.apache.hadoop.ozone.client.BucketArgs;
 +import org.apache.hadoop.ozone.client.ObjectStore;
 +import org.apache.hadoop.ozone.client.OzoneBucket;
 +import org.apache.hadoop.ozone.client.OzoneClient;
 +import org.apache.hadoop.ozone.client.OzoneClientFactory;
 +import org.apache.hadoop.ozone.client.OzoneKey;
 +import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 +import org.apache.hadoop.ozone.client.OzoneVolume;
 +import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 +import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 +import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 +import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 +import org.apache.hadoop.ozone.container.TestHelper;
 +import org.apache.ozone.test.GenericTestUtils;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
 +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 +
 +/**
 + * Tests key output stream.
 + */
 +public class TestECKeyOutputStream {
 +  private static MiniOzoneCluster cluster;
 +  private static OzoneConfiguration conf = new OzoneConfiguration();
 +  private static OzoneClient client;
 +  private static ObjectStore objectStore;
 +  private static int chunkSize;
 +  private static int flushSize;
 +  private static int maxFlushSize;
 +  private static int blockSize;
 +  private static String volumeName;
 +  private static String bucketName;
 +  private static String keyString;
 +  private static int dataBlocks = 3;
 +  private static int inputSize = dataBlocks * chunkSize;
 +  private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
 +
 +  /**
 +   * Create a MiniDFSCluster for testing.
 +   */
 +  @BeforeClass
 +  public static void init() throws Exception {
 +    chunkSize = 1024;
 +    flushSize = 2 * chunkSize;
 +    maxFlushSize = 2 * flushSize;
 +    blockSize = 2 * maxFlushSize;
 +
 +    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
 +    clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
 +    clientConfig.setStreamBufferFlushDelay(false);
 +    conf.setFromObject(clientConfig);
 +
 +    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
 +    // If SCM detects dead node too quickly, then container would be moved to
 +    // closed state and all in progress writes will get exception. To avoid
 +    // that, we are just keeping higher timeout and none of the tests depending
 +    // on deadnode detection timeout currently.
 +    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
 +    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS);
 +    conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
 +        TimeUnit.SECONDS);
 +    conf.setTimeDuration(
 +        "hdds.ratis.raft.server.notification.no-leader.timeout", 300,
 +        TimeUnit.SECONDS);
 +    conf.setQuietMode(false);
 +    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
 +        StorageUnit.MB);
 +    conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
 +        TimeUnit.MILLISECONDS);
 +    conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
 +        TimeUnit.SECONDS);
 +    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10)
 +        .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
 +        .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
 +        .setStreamBufferMaxSize(maxFlushSize)
 +        .setStreamBufferSizeUnit(StorageUnit.BYTES).build();
 +    cluster.waitForClusterToBeReady();
 +    client = OzoneClientFactory.getRpcClient(conf);
 +    objectStore = client.getObjectStore();
 +    keyString = UUID.randomUUID().toString();
 +    volumeName = "testeckeyoutputstream";
 +    bucketName = volumeName;
 +    objectStore.createVolume(volumeName);
 +    objectStore.getVolume(volumeName).createBucket(bucketName);
 +    initInputChunks();
 +  }
 +
 +  /**
 +   * Shutdown MiniDFSCluster.
 +   */
 +  @AfterClass
 +  public static void shutdown() {
 +    if (cluster != null) {
 +      cluster.shutdown();
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateKeyWithECReplicationConfig() throws Exception {
 +    try (OzoneOutputStream key = TestHelper
 +        .createKey(keyString, new ECReplicationConfig(3, 2,
 +              ECReplicationConfig.EcCodec.RS, chunkSize), inputSize,
 +            objectStore, volumeName, bucketName)) {
 +      Assert.assertTrue(key.getOutputStream() instanceof ECKeyOutputStream);
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateKeyWithOutBucketDefaults() throws Exception {
 +    OzoneVolume volume = objectStore.getVolume(volumeName);
 +    OzoneBucket bucket = volume.getBucket(bucketName);
 +    try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) {
 +      Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
 +      for (int i = 0; i < inputChunks.length; i++) {
 +        out.write(inputChunks[i]);
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateKeyWithBucketDefaults() throws Exception {
 +    String myBucket = UUID.randomUUID().toString();
 +    OzoneVolume volume = objectStore.getVolume(volumeName);
 +    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
 +    bucketArgs.setDefaultReplicationConfig(
 +        new DefaultReplicationConfig(ReplicationType.EC,
 +            new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
 +                chunkSize)));
 +
 +    volume.createBucket(myBucket, bucketArgs.build());
 +    OzoneBucket bucket = volume.getBucket(myBucket);
 +
 +    try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) {
 +      Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
 +      for (int i = 0; i < inputChunks.length; i++) {
 +        out.write(inputChunks[i]);
 +      }
 +    }
 +    byte[] buf = new byte[chunkSize];
 +    try (OzoneInputStream in = bucket.readKey(keyString)) {
 +      for (int i = 0; i < inputChunks.length; i++) {
 +        int read = in.read(buf, 0, chunkSize);
 +        Assert.assertEquals(chunkSize, read);
 +        Assert.assertTrue(Arrays.equals(buf, inputChunks[i]));
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
 +    OzoneBucket bucket = getOzoneBucket();
-     try (OzoneOutputStream out = bucket
-         .createKey("testCreateRatisKeyAndWithECBucketDefaults", 2000,
-             new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE),
-             new HashMap<>())) {
++    try (OzoneOutputStream out = bucket.createKey(
++        "testCreateRatisKeyAndWithECBucketDefaults", 2000,
++        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
++        new HashMap<>())) {
 +      Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
 +      for (int i = 0; i < inputChunks.length; i++) {
 +        out.write(inputChunks[i]);
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void test13ChunksInSingleWriteOp() throws IOException {
 +    testMultipleChunksInSingleWriteOp(13);
 +  }
 +
 +  @Test
 +  public void test15ChunksInSingleWriteOp() throws IOException {
 +    testMultipleChunksInSingleWriteOp(15);
 +  }
 +
 +  @Test
 +  public void test20ChunksInSingleWriteOp() throws IOException {
 +    testMultipleChunksInSingleWriteOp(20);
 +  }
 +
 +  @Test
 +  public void test21ChunksInSingleWriteOp() throws IOException {
 +    testMultipleChunksInSingleWriteOp(21);
 +  }
 +
 +  public void testMultipleChunksInSingleWriteOp(int numChunks)
 +      throws IOException {
 +    byte[] inputData = getInputBytes(numChunks);
 +    final OzoneBucket bucket = getOzoneBucket();
 +    String keyName = "testMultipleChunksInSingleWriteOp" + numChunks;
 +    try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
 +        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
 +            chunkSize), new HashMap<>())) {
 +      out.write(inputData);
 +    }
 +
 +    validateContent(inputData, bucket, bucket.getKey(keyName));
 +  }
 +
 +  @Test
 +  public void testECContainerKeysCount()
 +      throws IOException, InterruptedException, TimeoutException {
 +    byte[] inputData = getInputBytes(1);
 +    final OzoneBucket bucket = getOzoneBucket();
 +    ContainerOperationClient containerOperationClient =
 +        new ContainerOperationClient(conf);
 +
 +    ECReplicationConfig repConfig = new ECReplicationConfig(
 +        3, 2, ECReplicationConfig.EcCodec.RS, chunkSize);
 +    // Close all EC pipelines so we must get a fresh pipeline and hence
 +    // container for this test.
 +    PipelineManager pm =
 +        cluster.getStorageContainerManager().getPipelineManager();
 +    for (Pipeline p : pm.getPipelines(repConfig)) {
 +      pm.closePipeline(p, true);
 +    }
 +
 +    String keyName = UUID.randomUUID().toString();
 +    try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
 +        repConfig, new HashMap<>())) {
 +      out.write(inputData);
 +    }
 +    OzoneKeyDetails key = bucket.getKey(keyName);
 +    long currentKeyContainerID =
 +        key.getOzoneKeyLocations().get(0).getContainerID();
 +
 +    GenericTestUtils.waitFor(() -> {
 +      try {
 +        return containerOperationClient.getContainer(currentKeyContainerID)
 +            .getNumberOfKeys() == 1;
 +      } catch (IOException exception) {
 +        Assert.fail("Unexpected exception " + exception);
 +        return false;
 +      }
 +    }, 100, 10000);
 +    validateContent(inputData, bucket, key);
 +  }
 +
 +  private void validateContent(byte[] inputData, OzoneBucket bucket,
 +      OzoneKey key) throws IOException {
 +    try (OzoneInputStream is = bucket.readKey(key.getName())) {
 +      byte[] fileContent = new byte[inputData.length];
 +      Assert.assertEquals(inputData.length, is.read(fileContent));
 +      Assert.assertEquals(new String(inputData, UTF_8),
 +          new String(fileContent, UTF_8));
 +    }
 +  }
 +
 +  private OzoneBucket getOzoneBucket() throws IOException {
 +    String myBucket = UUID.randomUUID().toString();
 +    OzoneVolume volume = objectStore.getVolume(volumeName);
 +    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
 +    bucketArgs.setDefaultReplicationConfig(
 +        new DefaultReplicationConfig(ReplicationType.EC,
 +            new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
 +                chunkSize)));
 +
 +    volume.createBucket(myBucket, bucketArgs.build());
 +    return volume.getBucket(myBucket);
 +  }
 +
 +  private static void initInputChunks() {
 +    for (int i = 0; i < dataBlocks; i++) {
 +      inputChunks[i] = getBytesWith(i + 1, chunkSize);
 +    }
 +  }
 +
 +  private static byte[] getBytesWith(int singleDigitNumber, int total) {
 +    StringBuilder builder = new StringBuilder(singleDigitNumber);
 +    for (int i = 1; i <= total; i++) {
 +      builder.append(singleDigitNumber);
 +    }
 +    return builder.toString().getBytes(UTF_8);
 +  }
 +
 +  @Test
 +  public void testWriteShouldSucceedWhenDNKilled() throws Exception {
 +    int numChunks = 3;
 +    byte[] inputData = getInputBytes(numChunks);
 +    final OzoneBucket bucket = getOzoneBucket();
 +    String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks;
 +    DatanodeDetails nodeToKill = null;
 +    try {
 +      try (OzoneOutputStream out = bucket.createKey(keyName, 1024,
 +          new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
 +              chunkSize), new HashMap<>())) {
 +        out.write(inputData);
 +        // Kill a node from first pipeline
 +        nodeToKill =
 +            ((ECKeyOutputStream) out.getOutputStream()).getStreamEntries()
 +                .get(0).getPipeline().getFirstNode();
 +        cluster.shutdownHddsDatanode(nodeToKill);
 +
 +        out.write(inputData);
 +        // Check the second blockGroup pipeline to make sure that the failed not
 +        // is not selected.
 +        Assert.assertFalse(
 +            ((ECKeyOutputStream) out.getOutputStream()).getStreamEntries()
 +                .get(1).getPipeline().getNodes().contains(nodeToKill));
 +      }
 +
 +      try (OzoneInputStream is = bucket.readKey(keyName)) {
 +        // We wrote "inputData" twice, so do two reads and ensure the correct
 +        // data comes back.
 +        for (int i = 0; i < 2; i++) {
 +          byte[] fileContent = new byte[inputData.length];
 +          Assert.assertEquals(inputData.length, is.read(fileContent));
 +          Assert.assertEquals(new String(inputData, UTF_8),
 +              new String(fileContent, UTF_8));
 +        }
 +      }
 +    } finally {
 +      cluster.restartHddsDatanode(nodeToKill, true);
 +    }
 +  }
 +
 +  private byte[] getInputBytes(int numChunks) {
 +    byte[] inputData = new byte[numChunks * chunkSize];
 +    for (int i = 0; i < numChunks; i++) {
 +      int start = (i * chunkSize);
 +      Arrays.fill(inputData, start, start + chunkSize - 1,
 +          String.valueOf(i % 9).getBytes(UTF_8)[0]);
 +    }
 +    return inputData;
 +  }
 +
 +}
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 6e271fc,6272afa..37dec7c
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@@ -3899,47 -3798,4 +3922,49 @@@ public abstract class TestOzoneRpcClien
      createRequiredForVersioningTest(volumeName, bucketName, keyName, true);
      checkExceptedResultForVersioningTest(volumeName, bucketName, keyName, 2);
    }
 +
 +  @Test
 +  public void testSetECReplicationConfigOnBucket()
 +      throws IOException {
 +    String volumeName = UUID.randomUUID().toString();
 +    store.createVolume(volumeName);
 +    OzoneVolume volume = store.getVolume(volumeName);
 +    OzoneBucket bucket = getBucket(volume);
 +    ReplicationConfig currentReplicationConfig = bucket.getReplicationConfig();
 +    Assert.assertEquals(
-         new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE),
++        StandaloneReplicationConfig.getInstance(
++            HddsProtos.ReplicationFactor.ONE),
 +        currentReplicationConfig);
 +    ECReplicationConfig ecReplicationConfig =
 +        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, 1024);
 +    bucket.setReplicationConfig(ecReplicationConfig);
 +
 +    // Get the bucket and check the updated config.
 +    bucket = volume.getBucket(bucket.getName());
 +
 +    Assert.assertEquals(ecReplicationConfig, bucket.getReplicationConfig());
 +
 +    RatisReplicationConfig ratisReplicationConfig =
-         new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
 +    bucket.setReplicationConfig(ratisReplicationConfig);
 +
 +    // Get the bucket and check the updated config.
 +    bucket = volume.getBucket(bucket.getName());
 +
 +    Assert.assertEquals(ratisReplicationConfig, bucket.getReplicationConfig());
 +
 +    //Reset replication config back.
 +    bucket.setReplicationConfig(currentReplicationConfig);
 +  }
 +
 +  private OzoneBucket getBucket(OzoneVolume volume) throws IOException {
 +    String bucketName = UUID.randomUUID().toString();
 +    BucketArgs.Builder builder = BucketArgs.newBuilder();
 +    builder.setVersioning(true).setDefaultReplicationConfig(
 +        new DefaultReplicationConfig(
-             new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE)));
++            StandaloneReplicationConfig.getInstance(
++                HddsProtos.ReplicationFactor.ONE)));
 +    volume.createBucket(bucketName, builder.build());
 +    return volume.getBucket(bucketName);
 +  }
  }
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
index dcc03bc,0000000..8532835
mode 100644,000000..100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
@@@ -1,111 -1,0 +1,111 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.ozone.client.rpc.read;
 +
 +import org.apache.hadoop.hdds.client.BlockID;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 +import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 +import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
 +import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
 +import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
 +import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
 +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 +import org.junit.Test;
 +import org.junit.Assert;
 +
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.Map;
 +
 +/**
 + * Tests for BlockInputStreamFactoryImpl.
 + */
 +public class TestBlockInputStreamFactoryImpl {
 +
 +  @Test
 +  public void testNonECGivesBlockInputStream() {
 +    BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
 +    ReplicationConfig repConfig =
-         new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
 +
 +    OmKeyLocationInfo blockInfo = createKeyLocationInfo(repConfig, 3,
 +        1024 * 1024 * 10);
 +
 +    BlockExtendedInputStream stream =
 +        factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
 +            blockInfo.getToken(), true, null, null);
 +    Assert.assertTrue(stream instanceof BlockInputStream);
 +    Assert.assertEquals(stream.getBlockID(), blockInfo.getBlockID());
 +    Assert.assertEquals(stream.getLength(), blockInfo.getLength());
 +  }
 +
 +  @Test
 +  public void testECGivesECBlockInputStream() {
 +    BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
 +    ReplicationConfig repConfig =
 +        new ECReplicationConfig(3, 2);
 +
 +    OmKeyLocationInfo blockInfo =
 +        createKeyLocationInfo(repConfig, 5, 1024 * 1024 * 10);
 +
 +    BlockExtendedInputStream stream =
 +        factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
 +            blockInfo.getToken(), true, null, null);
 +    Assert.assertTrue(stream instanceof ECBlockInputStreamProxy);
 +    Assert.assertEquals(stream.getBlockID(), blockInfo.getBlockID());
 +    Assert.assertEquals(stream.getLength(), blockInfo.getLength());
 +  }
 +
 +  private OmKeyLocationInfo createKeyLocationInfo(ReplicationConfig repConf,
 +      long blockLength, Map<DatanodeDetails, Integer> dnMap) {
 +
 +    Pipeline pipeline = Pipeline.newBuilder()
 +        .setState(Pipeline.PipelineState.CLOSED)
 +        .setId(PipelineID.randomId())
 +        .setNodes(new ArrayList<>(dnMap.keySet()))
 +        .setReplicaIndexes(dnMap)
 +        .setReplicationConfig(repConf)
 +        .build();
 +
 +    OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
 +        .setBlockID(new BlockID(1, 1))
 +        .setLength(blockLength)
 +        .setOffset(0)
 +        .setPipeline(pipeline)
 +        .setPartNumber(0)
 +        .build();
 +    return keyInfo;
 +  }
 +
 +  private OmKeyLocationInfo createKeyLocationInfo(ReplicationConfig repConf,
 +      int nodeCount, long blockLength) {
 +    Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
 +    for (int i = 0; i < nodeCount; i++) {
 +      datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
 +    }
 +    return createKeyLocationInfo(repConf, blockLength, datanodes);
 +  }
 +
 +}
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
index 7ab6f3f,7f0ab38..07c2433
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
@@@ -171,14 -169,8 +171,14 @@@ public abstract class TestInputStreamBa
    }
  
    byte[] writeKey(String keyName, int dataLength) throws Exception {
-     ReplicationConfig repConfig = new RatisReplicationConfig(THREE);
++    ReplicationConfig repConfig = RatisReplicationConfig.getInstance(THREE);
 +    return writeKey(keyName, repConfig, dataLength);
 +  }
 +
 +  byte[] writeKey(String keyName, ReplicationConfig repConfig, int dataLength)
 +      throws Exception {
      OzoneOutputStream key = TestHelper.createKey(keyName,
 -        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
 +        repConfig, 0, objectStore, volumeName, bucketName);
  
      byte[] inputData = ContainerTestHelper.getFixedLengthString(
          keyString, dataLength).getBytes(UTF_8);
@@@ -190,15 -182,8 +190,15 @@@
  
    byte[] writeRandomBytes(String keyName, int dataLength)
        throws Exception {
-     ReplicationConfig repConfig = new RatisReplicationConfig(THREE);
++    ReplicationConfig repConfig = RatisReplicationConfig.getInstance(THREE);
 +    return writeRandomBytes(keyName, repConfig, dataLength);
 +  }
 +
 +  byte[] writeRandomBytes(String keyName, ReplicationConfig repConfig,
 +      int dataLength)
 +      throws Exception {
      OzoneOutputStream key = TestHelper.createKey(keyName,
 -        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
 +        repConfig, 0, objectStore, volumeName, bucketName);
  
      byte[] inputData = new byte[dataLength];
      RAND.nextBytes(inputData);
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 63e683f,7a4e1c3..dae6e38
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@@ -20,10 -20,14 +20,16 @@@ package org.apache.hadoop.ozone.contain
  
  import java.io.IOException;
  import java.security.MessageDigest;
- import java.util.*;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
  import java.util.concurrent.TimeoutException;
 +
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
  import org.apache.hadoop.hdds.client.ReplicationType;
  import org.apache.hadoop.hdds.protocol.DatanodeDetails;
  import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
index 689214f,cd6af31..4b51107
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
@@@ -191,10 -179,10 +192,10 @@@ public class TestRandomKeyGenerator 
    }
  
    @Test
-   @org.junit.Ignore("HDDS-5993")
-   public void testCleanObjects() throws Exception {
+   @Flaky("HDDS-5993")
+   public void cleanObjectsTest() throws Exception {
      RandomKeyGenerator randomKeyGenerator =
 -        new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
 +        new RandomKeyGenerator(cluster.getConf());
      randomKeyGenerator.setNumOfVolumes(2);
      randomKeyGenerator.setNumOfBuckets(5);
      randomKeyGenerator.setNumOfKeys(10);
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
index f56b292,9b9d6d8..7ff190a
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@@ -571,8 -569,6 +571,8 @@@ public class TestOmMetrics 
          .setBucketName(bucketName)
          .setKeyName(keyName)
          .setAcls(Lists.emptyList())
-         .setReplicationConfig(
-             new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE))
++        .setReplicationConfig(RatisReplicationConfig.getInstance(
++            HddsProtos.ReplicationFactor.THREE))
          .build();
    }
    private OmVolumeArgs createVolumeArgs() {
diff --cc hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index 54781c2,32dbe04..5e2ef2d
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@@ -175,11 -179,12 +179,11 @@@ public class OMKeyCommitRequestWithFSO 
        // AllocatedBlock. The space occupied by the Key shall be based on
        // the actual Key size, and the total Block size applied before should
        // be subtracted.
 -      long correctedSpace = omKeyInfo.getDataSize() * factor -
 +      long correctedSpace = omKeyInfo.getReplicatedSize() -
-               locationInfoList.size() * scmBlockSize * factor;
+           allocatedLocationInfoList.size() * scmBlockSize * factor;
        // Subtract the size of blocks to be overwritten.
        if (keyToDelete != null) {
 -        correctedSpace -= keyToDelete.getDataSize() *
 -            keyToDelete.getReplicationConfig().getRequiredNodes();
 +        correctedSpace -= keyToDelete.getReplicatedSize();
        }
        omBucketInfo.incrUsedBytes(correctedSpace);
  
diff --cc hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index cb69f27,1b327d2..48bcf46
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@@ -32,14 -32,15 +32,17 @@@ import java.util.Map
  import com.google.common.base.Optional;
  import com.google.common.base.Preconditions;
  import org.apache.commons.lang3.tuple.Pair;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
  import org.apache.hadoop.hdds.client.ReplicationConfig;
  import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+ import org.apache.hadoop.ozone.OmUtils;
  import org.apache.hadoop.ozone.OzoneAcl;
  import org.apache.hadoop.ozone.OzoneConsts;
  import org.apache.hadoop.ozone.om.PrefixManager;
  import org.apache.hadoop.ozone.om.ResolvedBucket;
  import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
++import org.apache.hadoop.ozone.om.helpers.BucketLayout;
  import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
  import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
  import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@@ -48,8 -49,8 +51,8 @@@ import org.apache.hadoop.ozone.om.helpe
  import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
  import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
  import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
--import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 +import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
+ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
  import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
  import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
  import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
diff --cc hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneConfigUtil.java
index 7dc07603,0000000..c888ee7
mode 100644,000000..100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneConfigUtil.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneConfigUtil.java
@@@ -1,105 -1,0 +1,105 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership.  The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +package org.apache.hadoop.ozone.om;
 +
 +import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationFactor;
 +import org.apache.hadoop.hdds.client.ReplicationType;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +/**
 + * Tests the server side replication config preference logic.
 + */
 +public class TestOzoneConfigUtil {
 +  private ReplicationConfig ratis3ReplicationConfig =
-       new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++      RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
 +  private HddsProtos.ReplicationType noneType = HddsProtos.ReplicationType.NONE;
 +  private HddsProtos.ReplicationFactor zeroFactor =
 +      HddsProtos.ReplicationFactor.ZERO;
 +  private HddsProtos.ECReplicationConfig clientECReplicationConfig =
 +      new ECReplicationConfig("rs-3-2-1024K").toProto();
 +  private DefaultReplicationConfig bucketECConfig =
 +      new DefaultReplicationConfig(
 +          new ECReplicationConfig(clientECReplicationConfig));
 +
 +  /**
 +   * Tests EC bucket defaults.
 +   */
 +  @Test
 +  public void testResolveClientSideRepConfigWhenBucketHasEC() {
 +    ReplicationConfig replicationConfig = OzoneConfigUtil
 +        .resolveReplicationConfigPreference(noneType, zeroFactor,
 +            clientECReplicationConfig, bucketECConfig, ratis3ReplicationConfig);
 +    // Client has no preference, so we should bucket defaults as we passed.
 +    Assert.assertEquals(bucketECConfig.getEcReplicationConfig(),
 +        replicationConfig);
 +  }
 +
 +  /**
 +   * Tests server defaults.
 +   */
 +  @Test
 +  public void testResolveClientSideRepConfigWithNoClientAndBucketDefaults() {
 +    ReplicationConfig replicationConfig = OzoneConfigUtil
 +        .resolveReplicationConfigPreference(noneType, zeroFactor,
 +            clientECReplicationConfig, null, ratis3ReplicationConfig);
 +    // Client has no preference, no bucket defaults, so it should return server
 +    // defaults.
 +    Assert.assertEquals(ratis3ReplicationConfig, replicationConfig);
 +  }
 +
 +  /**
 +   * Tests client preference of EC.
 +   */
 +  @Test
 +  public void testResolveClientSideRepConfigWhenClientPassEC() {
 +    ReplicationConfig replicationConfig = OzoneConfigUtil
 +        .resolveReplicationConfigPreference(HddsProtos.ReplicationType.EC,
 +            zeroFactor, clientECReplicationConfig, null,
 +            ratis3ReplicationConfig);
 +    // Client has preference of type EC, no bucket defaults, so it should return
 +    // client preference.
 +    Assert.assertEquals(new ECReplicationConfig("rs-3-2-1024K"),
 +        replicationConfig);
 +  }
 +
 +  /**
 +   * Tests bucket ratis defaults.
 +   */
 +  @Test
 +  public void testResolveClientSideRepConfigWhenBucketHasEC3() {
 +    DefaultReplicationConfig ratisBucketDefaults =
 +        new DefaultReplicationConfig(ReplicationType.RATIS,
 +            ReplicationFactor.THREE);
 +    ReplicationConfig replicationConfig = OzoneConfigUtil
 +        .resolveReplicationConfigPreference(noneType, zeroFactor,
 +            clientECReplicationConfig, ratisBucketDefaults,
 +            ratis3ReplicationConfig);
 +    // Client has no preference of type and bucket has ratis defaults, so it
 +    // should return ratis.
 +    Assert.assertEquals(ratisBucketDefaults.getType().name(),
 +        replicationConfig.getReplicationType().name());
 +    Assert.assertEquals(ratisBucketDefaults.getFactor(),
 +        ReplicationFactor.valueOf(replicationConfig.getRequiredNodes()));
 +  }
 +
 +}
diff --cc hadoop-ozone/ozonefs-common/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneClientUtils.java
index f977577,a9d4f98..561cc34
--- a/hadoop-ozone/ozonefs-common/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneClientUtils.java
+++ b/hadoop-ozone/ozonefs-common/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneClientUtils.java
@@@ -37,16 -29,9 +37,16 @@@ import static org.junit.Assert.assertNu
  import static org.mockito.Mockito.mock;
  
  /**
 - * Unit tests for OzoneClientUtils.
 + * Tests the behavior of OzoneClientUtils APIs.
   */
  public class TestOzoneClientUtils {
 +  private ReplicationConfig ecReplicationConfig =
 +      new ECReplicationConfig("rs-3-2-1024K");
 +  private ReplicationConfig ratis3ReplicationConfig =
-       new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++      RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
 +  private ReplicationConfig ratis1ReplicationConfig =
-       new RatisReplicationConfig(HddsProtos.ReplicationFactor.ONE);
++      RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE);
 +
    @Test(expected = IllegalArgumentException.class)
    public void testNegativeLength() throws IOException {
      OzoneVolume volume = mock(OzoneVolume.class);
diff --cc hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 96bfe73,145578b..99a08d6
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@@ -174,27 -169,22 +174,28 @@@ public class ObjectEndpoint extends End
            partNumber, uploadID, body);
      }
  
+     String copyHeader = null, storageType = null;
      try {
-       String copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
-       String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
+       copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
+       storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
  
 -      S3StorageType s3StorageType;
 -      boolean storageTypeDefault;
 +      // Normal put object
 +      OzoneBucket bucket = getBucket(bucketName);
 +      ReplicationConfig clientConfiguredReplicationConfig = null;
 +      String replication = ozoneConfiguration.get(OZONE_REPLICATION);
 +      if (replication != null) {
 +        clientConfiguredReplicationConfig = ReplicationConfig.parse(
 +            ReplicationType.valueOf(ozoneConfiguration
 +                .get(OZONE_REPLICATION_TYPE, OZONE_REPLICATION_TYPE_DEFAULT)),
 +            replication, ozoneConfiguration);
 +      }
 +      ReplicationConfig replicationConfig = S3Utils
 +          .resolveS3ClientSideReplicationConfig(storageType,
 +              clientConfiguredReplicationConfig, bucket.getReplicationConfig());
 +      boolean storageTypeDefault = false;
        if (storageType == null || storageType.equals("")) {
 -        s3StorageType = S3StorageType.getDefault(ozoneConfiguration);
          storageTypeDefault = true;
 -      } else {
 -        s3StorageType = toS3StorageType(storageType);
 -        storageTypeDefault = false;
        }
 -      ReplicationType replicationType = s3StorageType.getType();
 -      ReplicationFactor replicationFactor = s3StorageType.getFactor();
  
        if (copyHeader != null) {
          //Copy object, as copy source available.
diff --cc hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index 0f42b3d,ad12e65..fcddfde
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@@ -80,7 -79,8 +80,7 @@@ public class OzoneBucketStub extends Oz
        long creationTime) {
      super(volumeName,
          bucketName,
-         new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE),
 -        StandaloneReplicationConfig
 -            .getInstance(HddsProtos.ReplicationFactor.ONE),
++        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
          storageType,
          versioning,
          creationTime);
diff --cc hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestS3Utils.java
index 55f2f38,0000000..6b8c9fc
mode 100644,000000..100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestS3Utils.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestS3Utils.java
@@@ -1,143 -1,0 +1,143 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.ozone.s3.util;
 +
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.ozone.s3.exception.OS3Exception;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +/**
 + * Tests the S3Utils APIs.
 + */
 +public class TestS3Utils {
 +  private ReplicationConfig ecReplicationConfig =
 +      new ECReplicationConfig("rs-3-2-1024K");
 +  private ReplicationConfig ratis3ReplicationConfig =
-       new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++      RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
 +  private ReplicationConfig ratis1ReplicationConfig =
-       new RatisReplicationConfig(HddsProtos.ReplicationFactor.ONE);
++      RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE);
 +
 +  @Test
 +  public void testResolveClientSideRepConfigWhenBucketHasEC()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig = S3Utils
 +        .resolveS3ClientSideReplicationConfig(S3StorageType.STANDARD.name(),
 +            null, ecReplicationConfig);
 +    // Bucket default is EC.
 +    Assert.assertEquals(ecReplicationConfig, replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket replication is null and it should respect user passed value.
 +   */
 +  @Test
 +  public void testResolveClientSideRepConfigWhenBucketHasNull()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig = S3Utils
 +        .resolveS3ClientSideReplicationConfig(S3StorageType.STANDARD.name(),
 +            null, null);
 +    // Passed replication is 3 - Ozone mapped replication is ratis THREE
 +    Assert.assertEquals(ratis3ReplicationConfig, replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket replication is null and it should return null if user passed
 +   * value is invalid.
 +   */
 +  @Test
 +  public void testResolveClientSideRepConfigWhenUserPassedReplicationIsEmpty()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig =
 +        S3Utils.resolveS3ClientSideReplicationConfig("", null, null);
 +    // client configured value also null.
 +    // This API caller should leave the decision to server.
 +    Assert.assertNull(replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket default is non-EC and client side values are not valid, we
 +   * would just return null, so servers can make decision in this case.
 +   */
 +  @Test
 +  public void testResolveRepConfWhenUserPassedIsInvalidButBucketDefaultNonEC()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig = S3Utils
 +        .resolveS3ClientSideReplicationConfig(null, null,
 +            ratis3ReplicationConfig);
 +    // Configured client config also null.
 +    Assert.assertNull(replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket default is non-EC and client side value is valid, we
 +   * would should return client side valid value.
 +   */
 +  @Test
 +  public void testResolveRepConfWhenUserPassedIsValidButBucketDefaultNonEC()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig = S3Utils
 +        .resolveS3ClientSideReplicationConfig(
 +            S3StorageType.REDUCED_REDUNDANCY.name(), null,
 +            ratis3ReplicationConfig);
 +    // Passed value is replication one - Ozone mapped value is ratis ONE
 +    Assert.assertEquals(ratis1ReplicationConfig, replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket default is EC and client side value also valid, we would just
 +   * return bucket default EC.
 +   */
 +  @Test
 +  public void testResolveRepConfWhenUserPassedIsValidButBucketDefaultEC()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig = S3Utils
 +        .resolveS3ClientSideReplicationConfig(S3StorageType.STANDARD.name(),
 +            ratis3ReplicationConfig, ecReplicationConfig);
 +    // Bucket default is EC
 +    Assert.assertEquals(ecReplicationConfig, replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket default is non-EC and client side passed value also not valid
 +   * but configured value is valid, we would just return configured value.
 +   */
 +  @Test
 +  public void testResolveRepConfWhenUserPassedIsInvalidAndBucketDefaultNonEC()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig = S3Utils
 +        .resolveS3ClientSideReplicationConfig(null, ratis3ReplicationConfig,
 +            ratis1ReplicationConfig);
 +    // Configured value is ratis THREE
 +    Assert.assertEquals(ratis3ReplicationConfig, replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket default is non-EC and client side passed value also not valid
 +   * but configured value is valid, we would just return configured value.
 +   */
 +  @Test(expected = OS3Exception.class)
 +  public void testResolveRepConfWhenUserPassedIsInvalid() throws OS3Exception {
 +    S3Utils.resolveS3ClientSideReplicationConfig("INVALID",
 +        ratis3ReplicationConfig, ratis1ReplicationConfig);
 +  }
 +
 +}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org