You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2022/03/25 11:36:34 UTC

[ozone] branch HDDS-3816-ec updated (f386662 -> 9ec90f5)

This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a change to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git.


    from f386662  HDDS-6381. EC: Improve exception message in ByteBufferEncodingState (#3143)
     add 000902d  HDDS-6325. Fix interface ClientProtocol methods typo setThreadLocalS3Auth and clearThreadLocalS3Auth (#3093)
     add f0f344d  HDDS-6311. Fix number of keys displayed in Recon Overview. (#3081)
     add 87ff329  HDDS-6305: Add metrics - number of FSO bucket creates (#3077)
     add 8c5fefe  HDDS-6330. Remove unnecessary duplicate semicolons (#3097)
     add ff0209d  HDDS-6331. Remove toString in debug log parameters within SCMCommonPlacementPolicy (#3098)
     add 2c561d0  HDDS-6283. Change ContainerStateMachine ContainerOpExecutor name (#3055)
     add eb0f56b  HDDS-6324. Do not trigger CI by reopening PR (#3092)
     add c87b059  HDDS-6284. Add BlockDeletingService worker size config (#3056)
     add 51f47fc  HDDS-6314. ConcurrentModificationException getting SCMContainerMetrics (#3101)
     add 53fa44d  HDDS-6290. operational-state and node-state options in datanode list CLI not working correctly (#3105)
     add 91cf777  HDDS-6262. ozone insight log stops working after OM DBUpdates message (#3044)
     add 07fa775  HDDS-6234. Repair containers affected by incorrect used bytes and block count. (#3042)
     add 788ff0f  HDDS-6212. SCM Container DB bootstrap on Recon startup for secure cluster (#3027)
     add ad33ac6  HDDS-6292. Ensure immutable ContainerReplica set is returned from ContainerStateManagerImpl (#3071)
     add 8ad0796  HDDS-6329. New checkstyle: AvoidStarImport (#3096)
     add 7f6e6fb  HDDS-6350. Rename TestStandardOutputUtil to observe naming conventions (#3114)
     add 4b935c5  HDDS-6328. Add metrics - number of FSO bucket deletes (#3107)
     add e848ab7  HDDS-6327. Upgrade acceptance test doesn't collect logs when the test fails (#3106)
     add 68c5ac5  HDDS-6351. [Docs] Update prefix-based FSO documentation. (#3115)
     add 239d889  HDDS-5267. Full Container Report can remove replicas added by an Incremental Report (#2963)
     add f0ee98a  HDDS-6363. Update checkstyle version to 9.3 (#3123)
     add 1c7ff7e  HDDS-6221. Update versions covered by cross-compatibility test (#3017)
     add a195100  HDDS-5359. Incorrect BLOCKCOUNT and BYTESUSED in container DB. (#3034)
     add 5f4c31a  HDDS-6349. IncompleteReadError on get MPU key from TDE bucket (#3116)
     add a53a3a3  HDDS-6235. Empty KeyValueContainers are replicated without chunks directory. (#3052)
     add 7310491  HDDS-5194. Provide kustomize descriptors for Ozone kubernetes examples (#2221)
     add 93631a1  HDDS-6334. Remove ContainerID to Proto to ContainerID conversion in ContainerStateManagerImpl (#3110)
     add 17d86f1  HDDS-6361. Modify docs build flow to replace image tags with shortcodes. (#3122)
     add da93105  HDDS-6369. PARTIAL_RENAME does not update OM DB (#3126)
     add fbce851  HDDS-6374. Fix incorrect queueTime metrics of ReplicationTask (#3135)
     add ebbe9b5  HDDS-6337. [FSO] Disable recursive access check flag for directories with no children. (#3134)
     add 898ccbb  HDDS-6376. Docs: Fix classpath for ofs and o3fs (#3138)
     add 53d25b9  HDDS-6307. Improve processing and memory efficiency of full container reports (#3085)
     add 6e117f6  HDDS-6395. Intermittent failure in TestReconScmHASnapshot.testScmHASnapshot (#3149)
     add 2ed5994  HDDS-6268. Include audit log in acceptance test bundle (#3045)
     add 5f029d0  HDDS-6317: Export ReconTaskStatus as Prometheus metrics (#3088)
     add ba31f8c  HDDS-6333. Add a metric to record sequence number lag between Recon and OM (#3100)
     add e9722b7  HDDS-6401. Fix flaky TestFilePerBlockStrategy.testWriteAndReadChunkMultipleTimes (#3152)
     add d0cde3a  HDDS-5712. make it configurable to trigger refresh datanode usage info before start a new balance iteration (#2944)
     add b83c1f9  HDDS-6244. ContainerBalancer metrics don't show updated values in JMX (#3049)
     add e716c18  HDDS-6321. Avoid refresh pipeline for key lookup in checkAcls (#3103)
     add fe6f060  HDDS-6370. Document connecting IDE to Ozone on Kubernetes (#3132)
     add 94598d1  HDDS-6420. Ratis and Standalone ReplicationConfig should return a static instance (#3165)
     add 5217134  HDDS-6360. Create stack dump on acceptance test failure (#3154)
     add b439802  HDDS-6417 - Fix console output message when container balancer does not start (#3164)
     add bf38696  HDDS-6423. Fix typo in TestTools.md (#3170)
     add 2a1b6ba  HDDS-6379. Not deducting the STANDALONE pipelines when counting pipelines on each datanode to check the pipeline limit (#3158)
     add 58207a2  HDDS-6153. Add simple integration test to the read-replicas debug tool. (#3065)
     add 1c58d79  HDDS-6415 - add Over-Utilized and Under-Utilized DN details in debug log (#3171)
     add 5587023  HDDS-6357. RenameKey request has memory leak (#3121)
     add 2a776a3  HDDS-6306 - Fix picocli warnings in ozone container balancer help (#3162)
     add ea06d26  HDDS-6419. Provide better error message for malformed auth header (#3167)
     add 759ac57  HDDS-6407. Unwrap RemoteException in getClientProtocol (#3157)
     add 933a19c  HDDS-5656. Move old objects to delete table on overwriting multipart objects (#2813)
     add 1c414bf  HDDS-6299. Remove Log4J2 from Ozone FS shaded jars (#3073)
     add a720211  HDDS-6267. Recon is accessing mock classes. (#3084)
     add 27aad3c  HDDS-6391. ClientVersions and DatanodeVersions class to define an enum with version and description (#3155)
     add 7cfd846  HDDS-6251. EC: Smoketest for ozone admin datanode expects exactly 3 nodes (#3182)
     add b01138c  HDDS-6392. Introduce OzoneManagerVersion enum (#3159)
     add bb212de  HDDS-6418. Datanode usage info uses wrong version number (#3173)
     add f826d12  HDDS-6431. Fix usedBytes for FSO bucket (#3178)
     add 5a1babf  HDDS-6439. Minor perf optimization in the version enums (#3189)
     add 4454e6b  HDDS-6095. Skip slow integration tests, allow repeating known flaky tests 5x in CI (#3148)
     add 5f3f9fa  HDDS-4171. Typo in MiniOzoneCluster: DEFAULT_PIPELIME_LIMIT (#3191)
     add c2a67ac  HDDS-6450. [FSO] Fix correctedSpace for usedBytes in commitKey (#3193)
     add 74d92c8  HDDS-6213. Pluggable OzoneManager request handling hooks  (#3104)
     add 909b0d2  HDDS-6079. Replace Freon RK with OCKG/OCKV in acceptance tests (#2916)
     add be2021a  HDDS-6470. Fix TestOzoneManagerHAWithData#testOMRestart() (#3213)
     add ea296fe  HDDS-6468. Set correct resource type for ACL checks in BucketAcl and KeyAcl requests (#3209)
     add 416439d  HDDS-5867. Update quota when deleting open keys (#3206)
     add 97d8ed7  HDDS-6432. Smoketest for read-replicas tool expects exactly 3 datanodes (#3187)
     add 6408198  HDDS-6464. Turn off cross-check fail-fast for CI (#3208)
     add ca24bdb  HDDS-5228 Make OM FailOverProxyProvider work across threads (#3160)
     add d97687c  HDDS-6087. Compute composite CRC file checksum using chunk checksums from DataNodes. (#2937)
     add 4da4cc1  HDDS-6477. Intermittent failure in TestOzoneManagerHAMetadataOnly#testListVolumes (#3219)
     add 61a313f  HDDS-6228. Update config keys for open key cleanup service (#3022)
     add 042aa8b  HDDS-3370. Cleanup old write-path of bucket in OM (#3183)
     add 92995ec  HDDS-6261. OM crashes when trying to overwrite a key during upgrade downgrade testing (#3174)
     add a8ed41e  HDDS-6490. Replace deprecated --no-ansi option in acceptance tests (#3224)
     add a5602a8  HDDS-6480. Add missing CleanupTableInfo to OM responses (#3220)
     add 1baa652  HDDS-6488. Intermittent failure in ozone debug read-replicas with one datanode STALE (#3223)
     add 4ff6300  HDDS-6398. Add audit logs for DelegationToken (#3163)
     add 2f2f525  HDDS-6472. Errors in TestOzoneContainer integration test (#3216)
     add cfb8bae  HDDS-6393. Change the OzoneManager ServiceInfo message to carry the int version from OzoneManagerVersions. (#3211)
     add 9f716b8  HDDS-6482. Refactor Ratis Rpc and TLS config code. (#3222)
     add 75f5501  HDDS-6481. Add metrics for S3 gateway (#3221)
     new 9ec90f5  Merge remote-tracking branch 'origin/master' into HDDS-3816-ec-merge-master

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .github/workflows/post-commit.yml                  |  12 +-
 dev-support/annotations/pom.xml                    | 114 +++++
 .../RequestFeatureValidatorProcessor.java          | 289 ++++++++++++
 .../org/apache/ozone/annotations/package-info.java |   5 +
 .../services/javax.annotation.processing.Processor |  19 +-
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   2 +-
 .../storage/DummyBlockInputStreamWithRetry.java    |   2 +-
 hadoop-hdds/common/pom.xml                         |  13 +-
 ...DatanodeVersions.java => ComponentVersion.java} |  24 +-
 .../org/apache/hadoop/hdds/DatanodeVersion.java    |  71 +++
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |   1 -
 .../hadoop/hdds/client/RatisReplicationConfig.java |  34 +-
 .../hadoop/hdds/client/ReplicationConfig.java      |   4 +-
 .../hdds/client/StandaloneReplicationConfig.java   |  34 +-
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  13 +-
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |  66 ++-
 .../RequestTypeDependentRetryPolicyCreator.java    |   6 +-
 .../hadoop/hdds/scm/container/ContainerInfo.java   |  21 +-
 .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java  |   9 +-
 .../apache/hadoop/hdds/scm/net/InnerNodeImpl.java  |   2 +-
 .../protocol/StorageContainerLocationProtocol.java |  11 +-
 .../org/apache/hadoop/ozone/ClientVersion.java     |  75 +++
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |  32 +-
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   4 +
 .../apache/hadoop/ozone/OzoneManagerVersion.java   |  72 +++
 .../common/src/main/resources/ozone-default.xml    |  35 +-
 .../hdds/TestComponentVersionInvariants.java       |  98 ++++
 .../client/TestReplicationConfigValidator.java     |  16 +-
 .../hadoop/hdds/protocol/TestDatanodeDetails.java  |  11 +-
 .../hdds/scm/container/TestContainerInfo.java      |   2 +-
 .../hadoop/hdds/scm/pipeline/MockPipeline.java     |   4 +-
 .../hadoop/hdds/scm/pipeline/TestPipeline.java     |   9 +-
 .../ozone/container/ContainerTestHelper.java       |  68 ---
 .../TestDefaultUpgradeFinalizationExecutor.java    |   3 +-
 .../apache/hadoop/ozone/HddsDatanodeService.java   |   8 +-
 .../ozone/container/common/impl/ContainerData.java |  46 +-
 .../ozone/container/common/impl/ContainerSet.java  |  16 +-
 .../common/interfaces/ContainerInspector.java      |  72 +++
 .../ozone/container/common/interfaces/Handler.java |  12 +-
 .../common/report/ContainerReportPublisher.java    |   5 +-
 .../common/report/IncrementalReportSender.java     |  30 ++
 .../common/statemachine/DatanodeStateMachine.java  |   6 +-
 .../common/statemachine/StateContext.java          | 131 +++---
 .../CreatePipelineCommandHandler.java              |  13 +-
 .../states/datanode/RunningDatanodeState.java      |   8 +-
 .../states/endpoint/HeartbeatEndpointTask.java     |  23 +-
 .../server/ratis/ContainerStateMachine.java        |   7 +-
 .../transport/server/ratis/XceiverServerRatis.java |  18 +-
 .../common/utils/ContainerInspectorUtil.java       |  87 ++++
 .../container/common/volume/AbstractFuture.java    |  13 +-
 .../container/keyvalue/KeyValueContainer.java      |  76 ++-
 .../container/keyvalue/KeyValueContainerData.java  |  10 +-
 .../KeyValueContainerMetadataInspector.java        | 463 ++++++++++++++++++
 .../ozone/container/keyvalue/KeyValueHandler.java  |  95 ++--
 .../container/keyvalue/TarContainerPacker.java     |  55 ++-
 .../container/keyvalue/helpers/ChunkUtils.java     |   4 +
 .../keyvalue/helpers/KeyValueContainerUtil.java    |  23 +-
 .../container/keyvalue/impl/BlockManagerImpl.java  | 104 ++--
 .../keyvalue/impl/ChunkManagerDispatcher.java      |  18 +-
 .../background/BlockDeletingService.java           | 108 +++--
 .../container/metadata/AbstractDatanodeStore.java  |   6 +-
 .../ozone/container/ozoneimpl/ContainerReader.java |   6 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  40 +-
 .../container/replication/MeasuredReplicator.java  |   3 +-
 .../commands/RefreshVolumeUsageCommand.java        |  57 +++
 .../hadoop/ozone/container/common/ScmTestMock.java |   7 +-
 .../container/common/TestBlockDeletingService.java |   6 +-
 .../common/TestKeyValueContainerData.java          |   6 +-
 .../TestSchemaOneBackwardsCompatibility.java       |   2 +-
 .../common/helpers/TestDatanodeVersionFile.java    |   4 +-
 .../impl/TestContainerDeletionChoosingPolicy.java  |   2 +-
 .../common/impl/TestContainerPersistence.java      |  47 --
 .../container/common/impl/TestHddsDispatcher.java  |   7 +-
 .../common/statemachine/TestStateContext.java      |  71 ++-
 .../TestCreatePipelineCommandHandler.java          |  36 +-
 .../common/volume/TestStorageVolumeChecker.java    |  12 +-
 .../keyvalue/TestKeyValueBlockIterator.java        |  10 +-
 .../container/keyvalue/TestKeyValueContainer.java  |  67 ++-
 .../keyvalue/TestKeyValueContainerCheck.java       | 158 +------
 ...a => TestKeyValueContainerIntegrityChecks.java} | 160 ++-----
 .../TestKeyValueContainerMetadataInspector.java    | 360 ++++++++++++++
 .../container/keyvalue/TestKeyValueHandler.java    |  19 +-
 .../TestKeyValueHandlerWithUnhealthyContainer.java |  14 -
 .../keyvalue/impl/CommonChunkManagerTestCases.java |   1 -
 .../keyvalue/impl/TestBlockManagerImpl.java        |  57 +--
 .../container/ozoneimpl/TestContainerReader.java   |   2 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |   6 +-
 .../replication/TestMeasuredReplicator.java        |  15 +
 .../testutils/BlockDeletingServiceTestImpl.java    |   2 +-
 hadoop-hdds/dev-support/checkstyle/checkstyle.xml  |   1 +
 hadoop-hdds/docs/content/concept/Containers.md     |   3 +-
 hadoop-hdds/docs/content/concept/Datanodes.md      |   4 +-
 hadoop-hdds/docs/content/concept/OzoneManager.md   |   6 +-
 hadoop-hdds/docs/content/concept/Recon.md          |   9 +-
 hadoop-hdds/docs/content/feature/OM-HA.md          |   4 +-
 hadoop-hdds/docs/content/feature/PrefixFSO.md      |  68 ++-
 hadoop-hdds/docs/content/feature/SCM-HA.md         |   2 +-
 hadoop-hdds/docs/content/interface/O3fs.md         |   2 +-
 hadoop-hdds/docs/content/interface/O3fs.zh.md      |   4 +-
 hadoop-hdds/docs/content/interface/Ofs.md          |   2 +-
 hadoop-hdds/docs/content/tools/TestTools.md        |   4 +-
 hadoop-hdds/docs/content/tools/TestTools.zh.md     |   4 +-
 hadoop-hdds/docs/dev-support/bin/generate-site.sh  |  12 +-
 .../docs/dev-support/bin/make_images_responsive.py |  57 +++
 .../themes/ozonedoc/layouts/shortcodes/image.html  |   2 +-
 hadoop-hdds/framework/pom.xml                      |   8 +
 .../hadoop/hdds/protocol/SCMSecurityProtocol.java  |  12 +
 .../SCMSecurityProtocolClientSideTranslatorPB.java |  37 ++
 ...lockLocationProtocolClientSideTranslatorPB.java |   4 +-
 ...inerLocationProtocolClientSideTranslatorPB.java |   9 +-
 .../scm/update/client/UpdateServiceConfig.java     |   5 +-
 .../authority/PKIProfiles/DefaultProfile.java      |  32 +-
 ...ateClient.java => CommonCertificateClient.java} | 116 ++---
 .../certificate/client/OMCertificateClient.java    |  79 +---
 .../client/ReconCertificateClient.java}            |  38 +-
 ...va => FixedThreadPoolWithAffinityExecutor.java} |  70 ++-
 .../hadoop/hdds/utils/db/DBUpdatesWrapper.java     |   9 +
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |   1 +
 .../hadoop/hdds/utils/db/cache/EpochEntry.java     |  75 ---
 .../hadoop/hdds/utils/db/cache/FullTableCache.java |  50 +-
 .../hdds/utils/db/cache/PartialTableCache.java     |  56 +--
 .../hadoop/hdds/utils/db/cache/TableCache.java     |   4 +-
 .../client/TestDefaultCertificateClient.java       |   2 +-
 .../hadoop/hdds/server/events/TestEventQueue.java  |  35 +-
 .../hadoop/hdds/server/http/TestHtmlQuoting.java   |   5 +-
 .../hadoop/hdds/utils/db/cache/TestTableCache.java |  77 ++-
 hadoop-hdds/interface-client/pom.xml               |   5 +
 .../src/main/proto/DatanodeClientProtocol.proto    |   4 +-
 .../interface-client/src/main/proto/hdds.proto     |   7 +
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |  11 +-
 .../src/main/proto/ScmServerSecurityProtocol.proto |   8 +-
 .../hadoop/hdds/scm/SCMCommonPlacementPolicy.java  |   2 +-
 .../container/AbstractContainerReportHandler.java  |  43 +-
 .../hdds/scm/container/ContainerManager.java       |   7 +
 .../hdds/scm/container/ContainerManagerImpl.java   |  78 +--
 .../hdds/scm/container/ContainerReportHandler.java | 141 ++++--
 .../hdds/scm/container/ContainerStateManager.java  |  10 +-
 .../scm/container/ContainerStateManagerImpl.java   |  24 +-
 .../IncrementalContainerReportHandler.java         |  18 +-
 .../hdds/scm/container/ReplicationManager.java     |   4 +-
 .../scm/container/balancer/ContainerBalancer.java  | 133 ++++--
 .../balancer/ContainerBalancerConfiguration.java   |  23 +-
 .../balancer/ContainerBalancerMetrics.java         | 139 +++---
 .../scm/container/states/ContainerStateMap.java    |  27 +-
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |   2 +-
 .../apache/hadoop/hdds/scm/ha/HASecurityUtils.java |  37 +-
 .../hadoop/hdds/scm/ha/InterSCMGrpcClient.java     |   4 +-
 .../org/apache/hadoop/hdds/scm/ha/RatisUtil.java   |   8 +-
 .../hdds/scm/ha/SCMDBCheckpointProvider.java       |   2 +-
 ...ffer.java => SCMHADBTransactionBufferStub.java} |   8 +-
 ...MockSCMHAManager.java => SCMHAManagerStub.java} |  24 +-
 .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java     |   4 +-
 .../hadoop/hdds/scm/ha/SCMServiceManager.java      |   3 +-
 .../hdds/scm/metadata/MoveDataNodePairCodec.java   |   6 +-
 .../hadoop/hdds/scm/metadata/PipelineCodec.java    |   6 +-
 .../hdds/scm/node/DatanodeAdminMonitorImpl.java    |   9 +-
 .../hadoop/hdds/scm/node/DatanodeUsageInfo.java    |  15 +-
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  18 +
 .../hadoop/hdds/scm/node/NodeStateManager.java     |  18 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  34 +-
 .../hdds/scm/node/states/Node2ObjectsMap.java      |   2 +-
 .../hadoop/hdds/scm/node/states/NodeStateMap.java  |   4 +-
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |   4 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |  54 +--
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java |  11 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |   6 +-
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |   3 +-
 .../SCMSecurityProtocolServerSideTranslatorPB.java |  28 ++
 ...inerLocationProtocolServerSideTranslatorPB.java |   9 +-
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |   2 +-
 .../safemode/OneReplicaPipelineSafeModeRule.java   |   2 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  12 +-
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |  20 +
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |  12 +-
 .../hadoop/hdds/scm/server/SCMPolicyProvider.java  |   5 +-
 .../hdds/scm/server/SCMSecurityProtocolServer.java |  16 +-
 .../hadoop/hdds/scm/server/SCMStorageConfig.java   |   2 +-
 .../hdds/scm/server/StorageContainerManager.java   |  51 +-
 .../org/apache/hadoop/hdds/scm/HddsTestUtils.java  |  14 +-
 .../hadoop/hdds/scm/TestHddsServerUtils.java       |   2 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java    |   7 +-
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java |   8 +-
 .../hadoop/hdds/scm/container/MockNodeManager.java |  21 +
 .../hdds/scm/container/SimpleMockNodeManager.java  |  20 +-
 .../container/TestCloseContainerEventHandler.java  |   8 +-
 .../scm/container/TestContainerManagerImpl.java    |  26 +-
 .../scm/container/TestContainerReportHandler.java  |  34 +-
 .../scm/container/TestContainerStateManager.java   |  14 +-
 .../TestIncrementalContainerReportHandler.java     |  30 +-
 .../hdds/scm/container/TestReplicationManager.java | 140 +++---
 .../scm/container/TestUnknownContainerReport.java  |   4 +-
 .../container/balancer/TestContainerBalancer.java  |  84 ++--
 .../states/TestContainerReplicaCount.java          |   5 +-
 .../hdds/scm/ha/TestReplicationAnnotation.java     |  10 +-
 .../hdds/scm/ha/TestSequenceIDGenerator.java       |   4 +-
 .../hdds/scm/node/TestContainerPlacement.java      |   6 +-
 .../hdds/scm/node/TestDatanodeAdminMonitor.java    |   7 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |   3 +-
 .../hadoop/hdds/scm/node/TestNodeStateManager.java |  24 +
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  12 +-
 .../hdds/scm/node/TestSCMNodeStorageStatMap.java   |   5 +-
 .../hdds/scm/pipeline/MockPipelineManager.java     |   4 +-
 .../TestPipelineDatanodesIntersection.java         |  13 +-
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java | 129 ++---
 .../scm/pipeline/TestPipelinePlacementPolicy.java  | 158 ++++++-
 .../scm/pipeline/TestPipelineStateManagerImpl.java |  62 +--
 .../hdds/scm/pipeline/TestPipelineStateMap.java    |  12 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    |  57 ++-
 .../scm/pipeline/TestSimplePipelineProvider.java   |  18 +-
 .../pipeline/TestWritableECContainerProvider.java  |  16 +-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |  20 +-
 .../TestOneReplicaPipelineSafeModeRule.java        |  10 +-
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |  22 +-
 .../scm/server/TestSCMBlockProtocolServer.java     |  10 +-
 .../server/TestSCMDatanodeHeartbeatDispatcher.java |  54 +--
 .../server/TestStorageContainerManagerStarter.java |   6 +-
 .../testutils/ReplicationNodeManagerMock.java      |  14 +
 hadoop-hdds/test-utils/pom.xml                     |   5 +
 .../org/apache/ozone/test/GenericTestUtils.java    |  18 +-
 .../java/org/apache/ozone/test/tag/Flaky.java}     |  36 +-
 .../main/java/org/apache/ozone/test/tag/Slow.java} |  35 +-
 .../org/apache/ozone/test/tag/package-info.java}   |  23 +-
 .../scm/cli/ContainerBalancerStartSubcommand.java  |   8 +-
 .../hdds/scm/cli/ContainerOperationClient.java     |   9 +-
 .../hadoop/hdds/scm/cli/cert/CertCommands.java     |   4 +-
 .../hdds/scm/cli/datanode/ListInfoSubcommand.java  |  38 +-
 .../hdds/scm/cli/container/TestInfoSubCommand.java |   6 +-
 .../datanode/TestContainerBalancerSubCommand.java  |   6 +-
 .../cli/pipeline/TestListPipelinesSubCommand.java  |  12 +-
 .../hadoop/ozone/client/OzoneClientFactory.java    |   5 +-
 .../client/checksum/BaseFileChecksumHelper.java    |  54 ++-
 .../checksum/ReplicatedFileChecksumHelper.java     |  44 +-
 .../client/io/BlockOutputStreamEntryPool.java      |   1 -
 .../hadoop/ozone/client/io/ECBlockInputStream.java |   7 +-
 .../client/io/MultipartCryptoKeyInputStream.java   |   4 +
 .../ozone/client/protocol/ClientProtocol.java      |   4 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  46 +-
 .../checksum/TestReplicatedFileChecksumHelper.java |  39 +-
 .../hadoop/ozone/client/rpc/RpcClientTest.java     | 279 +++++------
 .../hadoop/ozone/client/rpc/TestOzoneKMSUtil.java  |   3 +-
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   4 +
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  15 +
 .../ozone/om/ha/OMFailoverProxyProvider.java       | 130 ++---
 .../apache/hadoop/ozone/om/helpers/DBUpdates.java  |  10 +
 .../hadoop/ozone/om/helpers/OmDirectoryInfo.java   |   7 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  16 +-
 .../ozone/om/helpers/OmKeyLocationInfoGroup.java   |   3 +-
 .../hadoop/ozone/om/helpers/OmVolumeArgs.java      |   4 +-
 .../hadoop/ozone/om/helpers/ServiceInfo.java       |  25 +-
 .../ozone/om/protocolPB/Hadoop3OmTransport.java    |   3 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  21 +-
 .../apache/hadoop/ozone/protocolPB/OMPBHelper.java |  37 +-
 .../apache/hadoop/ozone/security/acl/OzoneObj.java |   3 +-
 .../apache/hadoop/ozone/web/utils/OzoneUtils.java  |  18 +-
 .../org/apache/hadoop/ozone/TestOzoneAcls.java     |  10 +-
 .../ozone/om/ha/TestOMFailoverProxyProvider.java   |   8 +-
 .../hadoop/ozone/om/helpers/TestOmBucketInfo.java  |  18 +-
 .../hadoop/ozone/om/helpers/TestOmKeyInfo.java     |  24 +-
 .../ozone/om/helpers/TestOmMultipartKeyInfo.java   |   5 +-
 .../hadoop/ozone/om/helpers/TestQuotaUtil.java     |   5 +-
 .../ozone/security/acl/TestOzoneObjInfo.java       |   5 +-
 hadoop-ozone/csi/pom.xml                           |   4 +
 hadoop-ozone/datanode/pom.xml                      |   1 +
 hadoop-ozone/dev-support/checks/_lib.sh            |   2 +-
 .../dev-support/checks/_mvn_unit_report.sh         |  24 +-
 hadoop-ozone/dev-support/checks/acceptance.sh      |   3 +-
 hadoop-ozone/dev-support/checks/integration.sh     |   2 +-
 hadoop-ozone/dev-support/checks/rat.sh             |   4 +-
 hadoop-ozone/dist/pom.xml                          |   2 +-
 .../dist/src/main/compose/ozone-topology/test.sh   |   6 -
 .../dist/src/main/compose/ozone/docker-config      |   4 +
 .../dist/src/main/compose/ozone/prometheus.yml     |   1 +
 hadoop-ozone/dist/src/main/compose/ozone/test.sh   |   4 +
 .../src/main/compose/ozonesecure/docker-config     |   4 +
 .../dist/src/main/compose/ozonesecure/test.sh      |   2 +-
 hadoop-ozone/dist/src/main/compose/testlib.sh      |  98 +++-
 .../dist/src/main/compose/upgrade/testlib.sh       |   9 +-
 .../non-rolling-upgrade/1.1.0-1.2.0/callback.sh    |   6 +
 .../dist/src/main/compose/xcompat/clients.yaml     |  18 +
 hadoop-ozone/dist/src/main/compose/xcompat/test.sh |  17 +-
 hadoop-ozone/dist/src/main/k8s/README.md           |  68 +++
 .../k8s/definitions/ozone-csi/csi-controller.yaml  |   2 +-
 .../main/k8s/examples/getting-started/Flekszible   |   2 +
 .../examples/getting-started/config-configmap.yaml |   1 +
 .../kustomization.yaml}                            |  30 +-
 .../dist/src/main/k8s/examples/minikube/Flekszible |   2 +
 .../k8s/examples/minikube/config-configmap.yaml    |   1 +
 .../Flekszible => minikube/kustomization.yaml}     |  30 +-
 .../src/main/k8s/examples/ozone-dev/Flekszible     |   1 +
 .../k8s/examples/ozone-dev/config-configmap.yaml   |   1 +
 .../examples/ozone-dev/kustomization.yaml}         |  42 +-
 .../dist/src/main/k8s/examples/ozone-ha/Flekszible |   3 +
 .../ozone-ha/{Flekszible => kustomization.yaml}    |  26 +-
 .../dist/src/main/k8s/examples/ozone/Flekszible    |   3 +-
 .../main/k8s/examples/ozone/config-configmap.yaml  |   1 +
 .../Flekszible => ozone/kustomization.yaml}        |  26 +-
 hadoop-ozone/dist/src/main/license/jar-report.txt  |   1 +
 .../main/smoketest/auditparser/auditparser.robot   |  22 +-
 .../dist/src/main/smoketest/basic/basic.robot      |   9 +-
 .../debug/ozone-debug-corrupt-block.robot          |  49 ++
 .../ozone-debug-dead-datanode.robot}               |  35 +-
 .../debug/ozone-debug-stale-datanode.robot         |  48 ++
 .../main/smoketest/debug/ozone-debug-tests.robot   |  51 ++
 .../src/main/smoketest/debug/ozone-debug.robot     |  93 +++-
 .../dist/src/main/smoketest/freon/generate.robot   |  19 +-
 .../dist/src/main/smoketest/freon/remove.robot     |  21 +-
 .../dist/src/main/smoketest/freon/validate.robot   |  13 +-
 .../dist/src/main/smoketest/omha/om-prepare.robot  |   3 +-
 .../dist/src/main/smoketest/ozone-lib/freon.robot  |  65 +++
 .../dist/src/main/smoketest/recon/recon-api.robot  |   6 +-
 .../src/main/smoketest/s3/MultipartUpload.robot    |  21 +-
 .../dist/src/main/smoketest/s3/bucketlist.robot    |   7 +-
 .../dist/src/main/smoketest/s3/commonawslib.robot  |  36 ++
 .../dist/src/main/smoketest/spnego/web.robot       |   4 -
 .../dist/src/main/smoketest/upgrade/generate.robot |   5 +-
 .../fault-injection-test/mini-chaos-tests/pom.xml  |  25 +
 .../hadoop/ozone/TestAllMiniChaosOzoneCluster.java |   2 +-
 .../hadoop/ozone/insight/TestBaseInsightPoint.java |   7 +-
 hadoop-ozone/integration-test/pom.xml              |   5 +
 .../ozone/TestDirectoryDeletingServiceWithFSO.java |  26 +-
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |   4 +-
 .../fs/ozone/TestOzoneFileSystemWithLinks.java     |  14 +-
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java |   2 +-
 .../hadoop/hdds/scm/TestRatisPipelineLeader.java   |  23 +-
 .../hdds/scm/TestSCMDbCheckpointServlet.java       |   7 +-
 .../hadoop/hdds/scm/TestSCMInstallSnapshot.java    |  19 +-
 .../apache/hadoop/hdds/scm/TestSCMSnapshot.java    |   5 +-
 .../TestContainerStateManagerIntegration.java      |  44 +-
 .../metrics/TestSCMContainerManagerMetrics.java    |   4 +-
 .../hdds/scm/pipeline/TestLeaderChoosePolicy.java  |  28 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java    |   2 +-
 .../hadoop/hdds/scm/pipeline/TestNodeFailure.java  |   2 +-
 .../hdds/scm/pipeline/TestPipelineClose.java       |  29 +-
 .../TestRatisPipelineCreateAndDestroy.java         |  31 +-
 .../hadoop/hdds/scm/pipeline/TestSCMRestart.java   |   6 +-
 .../safemode/TestSCMSafeModeWithPipelineRules.java |   7 +-
 .../hadoop/hdds/upgrade/TestHDDSUpgrade.java       |  33 +-
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   8 +-
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  20 +-
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |   2 +-
 .../org/apache/hadoop/ozone/RatisTestHelper.java   |   5 +-
 ...OutputUtil.java => StandardOutputTestBase.java} |   2 +-
 .../hadoop/ozone/TestContainerOperations.java      |  35 +-
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |  36 +-
 .../hadoop/ozone/TestMiniOzoneOMHACluster.java     |   8 +-
 .../hadoop/ozone/TestOzoneConfigurationFields.java |   6 +-
 .../hadoop/ozone/TestSecureOzoneCluster.java       |  13 +-
 .../hadoop/ozone/TestStorageContainerManager.java  |   5 +-
 .../ozone/client/TestOzoneClientFactory.java       |  75 +++
 .../apache/hadoop/ozone/client/rpc/TestBCSID.java  |   7 +-
 .../rpc/TestBlockOutputStreamWithFailures.java     |  23 +-
 ...estBlockOutputStreamWithFailuresFlushDelay.java |  23 +-
 .../rpc/TestCloseContainerHandlingByClient.java    |  14 +-
 .../client/rpc/TestContainerStateMachine.java      |   4 +-
 .../TestContainerStateMachineFailureOnRead.java    |  21 +-
 .../rpc/TestContainerStateMachineFailures.java     |  13 +-
 .../rpc/TestContainerStateMachineFlushDelay.java   |  14 +-
 .../client/rpc/TestDeleteWithSlowFollower.java     |  10 +-
 .../ozone/client/rpc/TestECKeyOutputStream.java    |   8 +-
 .../client/rpc/TestFailureHandlingByClient.java    | 144 +++++-
 .../rpc/TestFailureHandlingByClientFlushDelay.java |   8 +-
 .../rpc/TestMultiBlockWritesWithDnFailures.java    |  10 +-
 .../client/rpc/TestOzoneAtRestEncryption.java      |  28 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java       |  18 +-
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  41 +-
 .../rpc/TestOzoneRpcClientForAclAuditLog.java      |   3 +-
 .../client/rpc/TestOzoneRpcClientWithRatis.java    |  10 +-
 .../ozone/client/rpc/TestSecureOzoneRpcClient.java |  20 +-
 .../ozone/client/rpc/TestWatchForCommit.java       |  12 +-
 .../rpc/read/TestBlockInputStreamFactoryImpl.java  |   2 +-
 .../ozone/client/rpc/read/TestInputStreamBase.java |   4 +-
 .../ozone/client/rpc/read/TestKeyInputStream.java  |   2 +-
 .../ozone/container/TestContainerReplication.java  |   2 +-
 .../apache/hadoop/ozone/container/TestHelper.java  |   8 +-
 .../commandhandler/TestBlockDeletion.java          |  21 +-
 .../TestCloseContainerByPipeline.java              |  24 +-
 .../commandhandler/TestCloseContainerHandler.java  |   2 +-
 .../commandhandler/TestDeleteContainerHandler.java |   2 +-
 ...ler.java => TestRefreshVolumeUsageHandler.java} | 121 +++--
 .../transport/server/ratis/TestCSMMetrics.java     |   3 +-
 .../container/ozoneimpl/TestOzoneContainer.java    | 154 +++---
 .../ozoneimpl/TestOzoneContainerWithTLS.java       |   6 +-
 .../container/server/TestContainerServer.java      |  23 +-
 .../server/TestSecureContainerServer.java          |  10 -
 .../hadoop/ozone/freon/TestRandomKeyGenerator.java |  15 +-
 .../hadoop/ozone/fsck/TestContainerMapper.java     |  16 +-
 .../hadoop/ozone/om/TestAddRemoveOzoneManager.java |  19 +-
 .../apache/hadoop/ozone/om/TestBucketOwner.java    |  27 +-
 .../ozone/om/TestContainerReportWithKeys.java      |   7 +-
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |  36 +-
 .../hadoop/ozone/om/TestOMDbCheckpointServlet.java |   2 -
 .../apache/hadoop/ozone/om/TestObjectStore.java    |  12 +-
 .../org/apache/hadoop/ozone/om/TestOmAcls.java     |  82 +++-
 .../hadoop/ozone/om/TestOmBlockVersioning.java     |  11 +-
 .../org/apache/hadoop/ozone/om/TestOmInit.java     |   7 +-
 .../org/apache/hadoop/ozone/om/TestOmLDBCli.java   |   5 +-
 .../org/apache/hadoop/ozone/om/TestOmMetrics.java  |   4 +-
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |  25 +-
 .../ozone/om/TestOzoneManagerHAKeyDeletion.java    |   2 +-
 .../ozone/om/TestOzoneManagerHAMetadataOnly.java   |  40 +-
 .../hadoop/ozone/om/TestOzoneManagerHAWithACL.java |   2 +-
 .../ozone/om/TestOzoneManagerHAWithData.java       |  62 +--
 .../ozone/om/TestOzoneManagerHAWithFailover.java   |   4 +-
 .../ozone/om/TestOzoneManagerListVolumes.java      |   2 -
 .../hadoop/ozone/om/TestOzoneManagerPrepare.java   |  14 +-
 .../hadoop/ozone/om/TestOzoneManagerRestart.java   |  49 +-
 .../apache/hadoop/ozone/om/TestScmSafeMode.java    |   2 +-
 .../hadoop/ozone/om/TestSecureOzoneManager.java    |   9 +-
 .../hadoop/ozone/recon/TestReconAsPassiveScm.java  |   8 +-
 .../hadoop/ozone/recon/TestReconScmSnapshot.java   |   2 +-
 .../apache/hadoop/ozone/recon/TestReconTasks.java  |   2 +-
 .../ozone/recon/TestReconWithOzoneManager.java     |   7 +-
 .../ozone/recon/TestReconWithOzoneManagerHA.java   |   2 +-
 .../hadoop/ozone/scm/TestFailoverWithSCMHA.java    |   5 +-
 .../TestSCMContainerPlacementPolicyMetrics.java    |  18 +-
 .../ozone/scm/TestSCMInstallSnapshotWithHA.java    |   4 +-
 .../org/apache/hadoop/ozone/scm/TestSCMMXBean.java |   4 +-
 .../ozone/scm/TestStorageContainerManagerHA.java   |   2 +-
 .../hadoop/ozone/scm/TestXceiverClientGrpc.java    |   2 +-
 .../scm/node/TestDecommissionAndMaintenance.java   |  27 +-
 .../ozone/scm/pipeline/TestSCMPipelineMetrics.java |   2 +-
 .../hadoop/ozone/shell/TestNSSummaryAdmin.java     |   4 +-
 hadoop-ozone/interface-client/pom.xml              |   4 +
 .../src/main/proto/OmClientProtocol.proto          |   3 +-
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   9 +-
 .../hadoop/ozone/om/codec/OmKeyInfoCodec.java      |   6 +-
 .../ozone/om/codec/RepeatedOmKeyInfoCodec.java     |   6 +-
 .../hadoop/ozone/om/codec/TestOmKeyInfoCodec.java  |   4 +-
 .../om/codec/TestOmMultipartKeyInfoCodec.java      |   3 +-
 .../ozone/om/codec/TestRepeatedOmKeyInfoCodec.java |   3 +-
 hadoop-ozone/ozone-manager/pom.xml                 |   5 +
 .../org/apache/hadoop/ozone/om/BucketManager.java  |  22 -
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  | 255 ----------
 .../hadoop/ozone/om/DirectoryDeletingService.java  |   7 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |   9 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 113 +++--
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  20 +
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  29 +-
 .../hadoop/ozone/om/OpenKeyCleanupService.java     |   3 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  10 +-
 .../apache/hadoop/ozone/om/OzoneManagerUtils.java  |  22 +
 .../hadoop/ozone/om/OzonePrefixPathImpl.java       |  20 +
 .../hadoop/ozone/om/TrashOzoneFileSystem.java      |   1 +
 .../hadoop/ozone/om/codec/OMDBDefinition.java      |  10 +-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |  15 +-
 .../hadoop/ozone/om/request/OMClientRequest.java   |  15 +-
 .../om/request/bucket/OMBucketCreateRequest.java   |  12 +-
 .../om/request/bucket/OMBucketDeleteRequest.java   |   9 +-
 .../om/request/bucket/acl/OMBucketAclRequest.java  |   4 +-
 .../file/OMDirectoryCreateRequestWithFSO.java      |   5 +-
 .../ozone/om/request/file/OMFileRequest.java       |   4 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  52 +-
 .../om/request/key/OMKeyCommitRequestWithFSO.java  |  24 +-
 .../om/request/key/OMKeyDeleteRequestWithFSO.java  |   2 +-
 .../om/request/key/OMKeyRenameRequestWithFSO.java  |   2 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |  40 +-
 .../om/request/key/OMOpenKeysDeleteRequest.java    |  42 +-
 .../ozone/om/request/key/acl/OMKeyAclRequest.java  |   2 +-
 .../om/request/key/acl/OMKeyAclRequestWithFSO.java |   2 +-
 .../S3MultipartUploadCompleteRequest.java          |  64 ++-
 .../S3MultipartUploadCompleteRequestWithFSO.java   |  12 +-
 .../security/OMCancelDelegationTokenRequest.java   |  42 +-
 .../security/OMGetDelegationTokenRequest.java      |  54 ++-
 .../security/OMRenewDelegationTokenRequest.java    |  51 +-
 .../validation/RequestFeatureValidator.java        |  99 ++++
 .../request/validation/RequestProcessingPhase.java |  36 +-
 .../om/request/validation/RequestValidations.java  | 107 +++++
 .../om/request/validation/ValidationCondition.java |  55 +++
 .../om/request/validation/ValidationContext.java   |  52 ++
 .../om/request/validation/ValidatorRegistry.java   | 201 ++++++++
 .../ozone/om/request/validation/package-info.java  |  62 +++
 .../om/response/bucket/OMBucketCreateResponse.java |   3 +-
 .../om/response/bucket/OMBucketDeleteResponse.java |   3 +-
 .../response/file/OMFileCreateResponseWithFSO.java |   4 +-
 .../om/response/key/OMAllocateBlockResponse.java   |   3 +-
 .../key/OMAllocateBlockResponseWithFSO.java        |   3 +-
 .../ozone/om/response/key/OMKeyCommitResponse.java |   4 +-
 .../response/key/OMKeyCommitResponseWithFSO.java   |   4 +-
 .../ozone/om/response/key/OMKeyCreateResponse.java |   3 +-
 .../response/key/OMKeyCreateResponseWithFSO.java   |   4 +-
 .../ozone/om/response/key/OMKeyDeleteResponse.java |   3 +-
 .../response/key/OMKeyDeleteResponseWithFSO.java   |   3 +-
 .../response/key/OMKeyRenameResponseWithFSO.java   |   3 +-
 .../om/response/key/OMKeysDeleteResponse.java      |   4 +-
 .../om/response/key/OMKeysRenameResponse.java      |  10 +
 .../om/response/key/OMOpenKeysDeleteResponse.java  |  18 +-
 .../multipart/S3MultipartUploadAbortResponse.java  |   9 +-
 .../S3MultipartUploadCommitPartResponse.java       |   9 +-
 .../S3MultipartUploadCompleteResponse.java         |  36 +-
 .../S3MultipartUploadCompleteResponseWithFSO.java  |   9 +-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |  25 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |   2 +
 .../org/apache/hadoop/ozone/om/OmTestManagers.java |  11 +
 .../ozone/om/ScmBlockLocationTestingClient.java    |   2 +-
 .../hadoop/ozone/om/TestBucketManagerImpl.java     | 243 +++++-----
 .../hadoop/ozone/om/TestKeyDeletingService.java    |   2 +-
 .../apache/hadoop/ozone/om/TestKeyManagerUnit.java |  14 +-
 .../hadoop/ozone/om/TestOmMetadataManager.java     |  50 +-
 .../hadoop/ozone/om/TestOzoneConfigUtil.java       |   2 +-
 .../hadoop/ozone/om/TestOzoneManagerStarter.java   |   4 +-
 .../apache/hadoop/ozone/om/TestTrashService.java   |   4 +-
 .../ozone/om/request/OMRequestTestUtils.java       |  16 +
 .../bucket/TestOMBucketCreateRequestWithFSO.java   |   4 +
 .../bucket/TestOMBucketDeleteRequestWithFSO.java   |  76 +++
 .../request/key/TestOMKeyDeleteRequestWithFSO.java |  97 +++-
 .../ozone/om/request/key/TestOMKeyRequest.java     |   2 +-
 .../request/key/TestOMOpenKeysDeleteRequest.java   | 174 ++++---
 .../TestS3MultipartUploadCompleteRequest.java      |  32 +-
 .../security/TestOMGetDelegationTokenRequest.java  |  10 +-
 .../TestRequestFeatureValidatorProcessor.java      | 524 +++++++++++++++++++++
 .../request/validation/TestRequestValidations.java | 349 ++++++++++++++
 .../request/validation/TestValidatorRegistry.java  | 215 +++++++++
 .../GeneralValidatorsForTesting.java               | 190 ++++++++
 .../ValidatorsForOnlyOldClientValidations.java     |  43 ++
 .../om/response/key/TestOMKeyDeleteResponse.java   |   3 +-
 .../response/key/TestOMOpenKeysDeleteResponse.java |  61 ++-
 .../s3/multipart/TestS3MultipartResponse.java      |  19 +-
 ...stS3MultipartUploadCompleteResponseWithFSO.java |  17 +-
 .../ozone/security/TestAWSV4AuthValidator.java     |   2 +-
 .../security/acl/TestOzoneNativeAuthorizer.java    |   8 +-
 .../hadoop/ozone/security/acl/TestParentAcl.java   |   2 +-
 .../hadoop/ozone/security/acl/TestVolumeOwner.java |   2 +-
 .../apache/hadoop/fs/ozone/OzoneClientUtils.java   |   2 +-
 .../hadoop/fs/ozone/TestOzoneClientUtils.java      |   4 +-
 hadoop-ozone/ozonefs-hadoop2/pom.xml               |   1 +
 .../hadoop/fs/ozone/Hadoop27RpcTransport.java      |   3 +-
 hadoop-ozone/ozonefs-hadoop3/pom.xml               |   1 +
 hadoop-ozone/ozonefs-shaded/pom.xml                |   5 +
 hadoop-ozone/recon-codegen/pom.xml                 |   4 +
 .../hadoop/ozone/recon/ReconControllerModule.java  |   2 +
 .../org/apache/hadoop/ozone/recon/ReconServer.java | 125 +++++
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  |  29 ++
 .../ozone/recon/api/ClusterStateEndpoint.java      |  15 +-
 .../recon/metrics/OzoneManagerSyncMetrics.java     |  12 +
 .../recon/metrics/ReconTaskStatusMetrics.java      |  83 ++++
 .../hadoop/ozone/recon/scm/ReconNodeManager.java   |  18 +-
 .../ozone/recon/scm/ReconPipelineManager.java      |   4 +-
 .../hadoop/ozone/recon/scm/ReconStorageConfig.java |  61 ++-
 .../scm/ReconStorageContainerManagerFacade.java    |  11 +-
 .../spi/impl/OzoneManagerServiceProviderImpl.java  |  14 +-
 .../impl/StorageContainerServiceProviderImpl.java  |  20 +-
 .../ozone/recon/OMMetadataManagerTestUtils.java    |  12 +-
 .../ozone/recon/api/TestContainerEndpoint.java     |   2 +-
 .../hadoop/ozone/recon/api/TestEndpoints.java      |   7 +-
 .../ozone/recon/api/TestOpenContainerCount.java    |  16 +-
 .../recon/fsck/TestContainerHealthStatus.java      |   4 +-
 .../ozone/recon/fsck/TestContainerHealthTask.java  |   6 +-
 .../TestContainerHealthTaskRecordGenerator.java    |   3 +-
 .../recovery/TestReconOmMetadataManagerImpl.java   |   4 +-
 .../scm/AbstractReconContainerManagerTest.java     |  20 +-
 .../ozone/recon/scm/TestReconContainerManager.java |   2 +-
 .../ozone/recon/scm/TestReconNodeManager.java      |  12 +-
 .../ozone/recon/scm/TestReconPipelineManager.java  |  15 +-
 .../impl/TestReconNamespaceSummaryManagerImpl.java |   6 +-
 .../recon/tasks/TestContainerKeyMapperTask.java    |   4 +-
 .../ozone/recon/tasks/TestNSSummaryTask.java       |   4 +-
 .../ozone/recon/tasks/TestOMDBUpdatesHandler.java  |   4 +-
 .../java/org/apache/hadoop/ozone/s3/Gateway.java   |   9 +-
 .../hadoop/ozone/s3/OzoneClientProducer.java       |  24 +-
 .../hadoop/ozone/s3/endpoint/BucketEndpoint.java   |  13 +
 .../hadoop/ozone/s3/endpoint/EndpointBase.java     |  12 +-
 .../ozone/s3/endpoint/ListBucketResponse.java      |   6 +-
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   |  34 +-
 .../hadoop/ozone/s3/endpoint/RootEndpoint.java     |   9 +-
 .../hadoop/ozone/s3/exception/S3ErrorTable.java    |   9 +
 .../hadoop/ozone/s3/metrics/S3GatewayMetrics.java  | 320 +++++++++++++
 .../hadoop/ozone/s3/metrics/package-info.java      |  23 +-
 .../hadoop/ozone/client/OzoneBucketStub.java       |   2 +-
 .../ozone/s3/metrics/TestS3GatewayMetrics.java     | 113 +++++
 .../apache/hadoop/ozone/s3/util/TestS3Utils.java   |   4 +-
 .../apache/hadoop/ozone/debug/ChunkKeyHandler.java |   3 +-
 .../apache/hadoop/ozone/debug/ReadReplicas.java    |   5 +-
 .../ozone/freon/LeaderAppendLogEntryGenerator.java |   2 +-
 .../apache/hadoop/ozone/freon/OmKeyGenerator.java  |   2 +-
 .../hadoop/ozone/freon/SCMThroughputBenchmark.java |   2 +-
 .../freon/containergenerator/GeneratorOm.java      |   5 +-
 .../freon/containergenerator/GeneratorScm.java     |   4 +-
 .../apache/hadoop/ozone/fsck/ContainerMapper.java  |   5 +-
 .../apache/hadoop/ozone/freon/TestProgressBar.java |   6 +-
 pom.xml                                            |  85 +++-
 580 files changed, 11454 insertions(+), 4601 deletions(-)
 create mode 100644 dev-support/annotations/pom.xml
 create mode 100644 dev-support/annotations/src/main/java/org/apache/ozone/annotations/RequestFeatureValidatorProcessor.java
 create mode 100644 dev-support/annotations/src/main/java/org/apache/ozone/annotations/package-info.java
 rename .github/workflows/cancel-ci.yaml => dev-support/annotations/src/main/resources/META-INF/services/javax.annotation.processing.Processor (61%)
 copy hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/{DatanodeVersions.java => ComponentVersion.java} (67%)
 create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersion.java
 create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/ClientVersion.java
 create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneManagerVersion.java
 create mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestComponentVersionInvariants.java
 create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerInspector.java
 create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/IncrementalReportSender.java
 create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerInspectorUtil.java
 create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainerMetadataInspector.java
 create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/RefreshVolumeUsageCommand.java
 copy hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/{TestKeyValueContainerCheck.java => TestKeyValueContainerIntegrityChecks.java} (52%)
 create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/TestKeyValueContainerMetadataInspector.java
 create mode 100644 hadoop-hdds/docs/dev-support/bin/make_images_responsive.py
 copy hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/{OMCertificateClient.java => CommonCertificateClient.java} (51%)
 copy hadoop-hdds/{common/src/main/java/org/apache/hadoop/ozone/ClientVersions.java => framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/ReconCertificateClient.java} (51%)
 rename hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/{FixedThreadPoolExecutor.java => FixedThreadPoolWithAffinityExecutor.java} (59%)
 delete mode 100644 hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/cache/EpochEntry.java
 rename hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/{MockSCMHADBTransactionBuffer.java => SCMHADBTransactionBufferStub.java} (92%)
 rename hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/{MockSCMHAManager.java => SCMHAManagerStub.java} (91%)
 copy hadoop-hdds/{common/src/main/java/org/apache/hadoop/ozone/ClientVersions.java => test-utils/src/main/java/org/apache/ozone/test/tag/Flaky.java} (56%)
 rename hadoop-hdds/{common/src/main/java/org/apache/hadoop/ozone/ClientVersions.java => test-utils/src/main/java/org/apache/ozone/test/tag/Slow.java} (59%)
 copy hadoop-hdds/{common/src/main/java/org/apache/hadoop/hdds/DatanodeVersions.java => test-utils/src/main/java/org/apache/ozone/test/tag/package-info.java} (62%)
 create mode 100644 hadoop-ozone/dist/src/main/k8s/README.md
 copy hadoop-ozone/dist/src/main/k8s/examples/{ozone-ha/Flekszible => getting-started/kustomization.yaml} (71%)
 copy hadoop-ozone/dist/src/main/k8s/examples/{ozone-ha/Flekszible => minikube/kustomization.yaml} (71%)
 copy hadoop-ozone/dist/src/main/{compose/ozone/prometheus.yml => k8s/examples/ozone-dev/kustomization.yaml} (56%)
 copy hadoop-ozone/dist/src/main/k8s/examples/ozone-ha/{Flekszible => kustomization.yaml} (71%)
 copy hadoop-ozone/dist/src/main/k8s/examples/{ozone-ha/Flekszible => ozone/kustomization.yaml} (71%)
 create mode 100644 hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-corrupt-block.robot
 copy hadoop-ozone/dist/src/main/smoketest/{freon/validate.robot => debug/ozone-debug-dead-datanode.robot} (50%)
 create mode 100644 hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-stale-datanode.robot
 create mode 100644 hadoop-ozone/dist/src/main/smoketest/debug/ozone-debug-tests.robot
 create mode 100644 hadoop-ozone/dist/src/main/smoketest/ozone-lib/freon.robot
 rename hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/{TestStandardOutputUtil.java => StandardOutputTestBase.java} (98%)
 create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/TestOzoneClientFactory.java
 copy hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/{TestCloseContainerHandler.java => TestRefreshVolumeUsageHandler.java} (55%)
 create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/validation/RequestFeatureValidator.java
 copy hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/update/client/UpdateServiceConfig.java => hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/validation/RequestProcessingPhase.java (54%)
 create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/validation/RequestValidations.java
 create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/validation/ValidationCondition.java
 create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/validation/ValidationContext.java
 create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/validation/ValidatorRegistry.java
 create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/validation/package-info.java
 create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/bucket/TestOMBucketDeleteRequestWithFSO.java
 create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/validation/TestRequestFeatureValidatorProcessor.java
 create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/validation/TestRequestValidations.java
 create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/validation/TestValidatorRegistry.java
 create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/validation/testvalidatorset1/GeneralValidatorsForTesting.java
 create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/validation/testvalidatorset2/ValidatorsForOnlyOldClientValidations.java
 create mode 100644 hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/metrics/ReconTaskStatusMetrics.java
 create mode 100644 hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/S3GatewayMetrics.java
 rename hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/DatanodeVersions.java => hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/metrics/package-info.java (62%)
 create mode 100644 hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/metrics/TestS3GatewayMetrics.java

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org


[ozone] 01/01: Merge remote-tracking branch 'origin/master' into HDDS-3816-ec-merge-master

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 9ec90f5820aa9143393f5aa392ed9c1f234e9c2c
Merge: f386662 75f5501
Author: Doroszlai, Attila <ad...@apache.org>
AuthorDate: Thu Mar 24 19:19:08 2022 +0100

    Merge remote-tracking branch 'origin/master' into HDDS-3816-ec-merge-master

 .github/workflows/post-commit.yml                  |  12 +-
 dev-support/annotations/pom.xml                    | 114 +++++
 .../RequestFeatureValidatorProcessor.java          | 289 ++++++++++++
 .../org/apache/ozone/annotations/package-info.java |   5 +
 .../services/javax.annotation.processing.Processor |  19 +-
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   2 +-
 .../storage/DummyBlockInputStreamWithRetry.java    |   2 +-
 hadoop-hdds/common/pom.xml                         |  13 +-
 ...DatanodeVersions.java => ComponentVersion.java} |  24 +-
 .../org/apache/hadoop/hdds/DatanodeVersion.java    |  71 +++
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |   1 -
 .../hadoop/hdds/client/RatisReplicationConfig.java |  34 +-
 .../hadoop/hdds/client/ReplicationConfig.java      |   4 +-
 .../hdds/client/StandaloneReplicationConfig.java   |  34 +-
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  13 +-
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |  66 ++-
 .../RequestTypeDependentRetryPolicyCreator.java    |   6 +-
 .../hadoop/hdds/scm/container/ContainerInfo.java   |  21 +-
 .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java  |   9 +-
 .../apache/hadoop/hdds/scm/net/InnerNodeImpl.java  |   2 +-
 .../protocol/StorageContainerLocationProtocol.java |  11 +-
 .../org/apache/hadoop/ozone/ClientVersion.java     |  75 +++
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |  32 +-
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   4 +
 .../apache/hadoop/ozone/OzoneManagerVersion.java   |  72 +++
 .../common/src/main/resources/ozone-default.xml    |  35 +-
 .../hdds/TestComponentVersionInvariants.java       |  98 ++++
 .../client/TestReplicationConfigValidator.java     |  16 +-
 .../hadoop/hdds/protocol/TestDatanodeDetails.java  |  11 +-
 .../hdds/scm/container/TestContainerInfo.java      |   2 +-
 .../hadoop/hdds/scm/pipeline/MockPipeline.java     |   4 +-
 .../hadoop/hdds/scm/pipeline/TestPipeline.java     |   9 +-
 .../ozone/container/ContainerTestHelper.java       |  68 ---
 .../TestDefaultUpgradeFinalizationExecutor.java    |   3 +-
 .../apache/hadoop/ozone/HddsDatanodeService.java   |   8 +-
 .../ozone/container/common/impl/ContainerData.java |  46 +-
 .../ozone/container/common/impl/ContainerSet.java  |  16 +-
 .../common/interfaces/ContainerInspector.java      |  72 +++
 .../ozone/container/common/interfaces/Handler.java |  12 +-
 .../common/report/ContainerReportPublisher.java    |   5 +-
 .../common/report/IncrementalReportSender.java     |  30 ++
 .../common/statemachine/DatanodeStateMachine.java  |   6 +-
 .../common/statemachine/StateContext.java          | 131 +++---
 .../CreatePipelineCommandHandler.java              |  13 +-
 .../states/datanode/RunningDatanodeState.java      |   8 +-
 .../states/endpoint/HeartbeatEndpointTask.java     |  23 +-
 .../server/ratis/ContainerStateMachine.java        |   7 +-
 .../transport/server/ratis/XceiverServerRatis.java |  18 +-
 .../common/utils/ContainerInspectorUtil.java       |  87 ++++
 .../container/common/volume/AbstractFuture.java    |  13 +-
 .../container/keyvalue/KeyValueContainer.java      |  76 ++-
 .../container/keyvalue/KeyValueContainerData.java  |  10 +-
 .../KeyValueContainerMetadataInspector.java        | 463 ++++++++++++++++++
 .../ozone/container/keyvalue/KeyValueHandler.java  |  95 ++--
 .../container/keyvalue/TarContainerPacker.java     |  55 ++-
 .../container/keyvalue/helpers/ChunkUtils.java     |   4 +
 .../keyvalue/helpers/KeyValueContainerUtil.java    |  23 +-
 .../container/keyvalue/impl/BlockManagerImpl.java  | 104 ++--
 .../keyvalue/impl/ChunkManagerDispatcher.java      |  18 +-
 .../background/BlockDeletingService.java           | 108 +++--
 .../container/metadata/AbstractDatanodeStore.java  |   6 +-
 .../ozone/container/ozoneimpl/ContainerReader.java |   6 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  40 +-
 .../container/replication/MeasuredReplicator.java  |   3 +-
 .../commands/RefreshVolumeUsageCommand.java        |  57 +++
 .../hadoop/ozone/container/common/ScmTestMock.java |   7 +-
 .../container/common/TestBlockDeletingService.java |   6 +-
 .../common/TestKeyValueContainerData.java          |   6 +-
 .../TestSchemaOneBackwardsCompatibility.java       |   2 +-
 .../common/helpers/TestDatanodeVersionFile.java    |   4 +-
 .../impl/TestContainerDeletionChoosingPolicy.java  |   2 +-
 .../common/impl/TestContainerPersistence.java      |  47 --
 .../container/common/impl/TestHddsDispatcher.java  |   7 +-
 .../common/statemachine/TestStateContext.java      |  71 ++-
 .../TestCreatePipelineCommandHandler.java          |  36 +-
 .../common/volume/TestStorageVolumeChecker.java    |  12 +-
 .../keyvalue/TestKeyValueBlockIterator.java        |  10 +-
 .../container/keyvalue/TestKeyValueContainer.java  |  67 ++-
 .../keyvalue/TestKeyValueContainerCheck.java       | 158 +------
 ...a => TestKeyValueContainerIntegrityChecks.java} | 160 ++-----
 .../TestKeyValueContainerMetadataInspector.java    | 360 ++++++++++++++
 .../container/keyvalue/TestKeyValueHandler.java    |  19 +-
 .../TestKeyValueHandlerWithUnhealthyContainer.java |  14 -
 .../keyvalue/impl/CommonChunkManagerTestCases.java |   1 -
 .../keyvalue/impl/TestBlockManagerImpl.java        |  57 +--
 .../container/ozoneimpl/TestContainerReader.java   |   2 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |   6 +-
 .../replication/TestMeasuredReplicator.java        |  15 +
 .../testutils/BlockDeletingServiceTestImpl.java    |   2 +-
 hadoop-hdds/dev-support/checkstyle/checkstyle.xml  |   1 +
 hadoop-hdds/docs/content/concept/Containers.md     |   3 +-
 hadoop-hdds/docs/content/concept/Datanodes.md      |   4 +-
 hadoop-hdds/docs/content/concept/OzoneManager.md   |   6 +-
 hadoop-hdds/docs/content/concept/Recon.md          |   9 +-
 hadoop-hdds/docs/content/feature/OM-HA.md          |   4 +-
 hadoop-hdds/docs/content/feature/PrefixFSO.md      |  68 ++-
 hadoop-hdds/docs/content/feature/SCM-HA.md         |   2 +-
 hadoop-hdds/docs/content/interface/O3fs.md         |   2 +-
 hadoop-hdds/docs/content/interface/O3fs.zh.md      |   4 +-
 hadoop-hdds/docs/content/interface/Ofs.md          |   2 +-
 hadoop-hdds/docs/content/tools/TestTools.md        |   4 +-
 hadoop-hdds/docs/content/tools/TestTools.zh.md     |   4 +-
 hadoop-hdds/docs/dev-support/bin/generate-site.sh  |  12 +-
 .../docs/dev-support/bin/make_images_responsive.py |  57 +++
 .../themes/ozonedoc/layouts/shortcodes/image.html  |   2 +-
 hadoop-hdds/framework/pom.xml                      |   8 +
 .../hadoop/hdds/protocol/SCMSecurityProtocol.java  |  12 +
 .../SCMSecurityProtocolClientSideTranslatorPB.java |  37 ++
 ...lockLocationProtocolClientSideTranslatorPB.java |   4 +-
 ...inerLocationProtocolClientSideTranslatorPB.java |   9 +-
 .../scm/update/client/UpdateServiceConfig.java     |   5 +-
 .../authority/PKIProfiles/DefaultProfile.java      |  32 +-
 ...ateClient.java => CommonCertificateClient.java} | 116 ++---
 .../certificate/client/OMCertificateClient.java    |  79 +---
 .../client/ReconCertificateClient.java}            |  38 +-
 ...va => FixedThreadPoolWithAffinityExecutor.java} |  70 ++-
 .../hadoop/hdds/utils/db/DBUpdatesWrapper.java     |   9 +
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |   1 +
 .../hadoop/hdds/utils/db/cache/EpochEntry.java     |  75 ---
 .../hadoop/hdds/utils/db/cache/FullTableCache.java |  50 +-
 .../hdds/utils/db/cache/PartialTableCache.java     |  56 +--
 .../hadoop/hdds/utils/db/cache/TableCache.java     |   4 +-
 .../client/TestDefaultCertificateClient.java       |   2 +-
 .../hadoop/hdds/server/events/TestEventQueue.java  |  35 +-
 .../hadoop/hdds/server/http/TestHtmlQuoting.java   |   5 +-
 .../hadoop/hdds/utils/db/cache/TestTableCache.java |  77 ++-
 hadoop-hdds/interface-client/pom.xml               |   5 +
 .../src/main/proto/DatanodeClientProtocol.proto    |   4 +-
 .../interface-client/src/main/proto/hdds.proto     |   7 +
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |  11 +-
 .../src/main/proto/ScmServerSecurityProtocol.proto |   8 +-
 .../hadoop/hdds/scm/SCMCommonPlacementPolicy.java  |   2 +-
 .../container/AbstractContainerReportHandler.java  |  43 +-
 .../hdds/scm/container/ContainerManager.java       |   7 +
 .../hdds/scm/container/ContainerManagerImpl.java   |  78 +--
 .../hdds/scm/container/ContainerReportHandler.java | 141 ++++--
 .../hdds/scm/container/ContainerStateManager.java  |  10 +-
 .../scm/container/ContainerStateManagerImpl.java   |  24 +-
 .../IncrementalContainerReportHandler.java         |  18 +-
 .../hdds/scm/container/ReplicationManager.java     |   4 +-
 .../scm/container/balancer/ContainerBalancer.java  | 133 ++++--
 .../balancer/ContainerBalancerConfiguration.java   |  23 +-
 .../balancer/ContainerBalancerMetrics.java         | 139 +++---
 .../scm/container/states/ContainerStateMap.java    |  27 +-
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |   2 +-
 .../apache/hadoop/hdds/scm/ha/HASecurityUtils.java |  37 +-
 .../hadoop/hdds/scm/ha/InterSCMGrpcClient.java     |   4 +-
 .../org/apache/hadoop/hdds/scm/ha/RatisUtil.java   |   8 +-
 .../hdds/scm/ha/SCMDBCheckpointProvider.java       |   2 +-
 ...ffer.java => SCMHADBTransactionBufferStub.java} |   8 +-
 ...MockSCMHAManager.java => SCMHAManagerStub.java} |  24 +-
 .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java     |   4 +-
 .../hadoop/hdds/scm/ha/SCMServiceManager.java      |   3 +-
 .../hdds/scm/metadata/MoveDataNodePairCodec.java   |   6 +-
 .../hadoop/hdds/scm/metadata/PipelineCodec.java    |   6 +-
 .../hdds/scm/node/DatanodeAdminMonitorImpl.java    |   9 +-
 .../hadoop/hdds/scm/node/DatanodeUsageInfo.java    |  15 +-
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  18 +
 .../hadoop/hdds/scm/node/NodeStateManager.java     |  18 +-
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  34 +-
 .../hdds/scm/node/states/Node2ObjectsMap.java      |   2 +-
 .../hadoop/hdds/scm/node/states/NodeStateMap.java  |   4 +-
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |   4 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |  54 +--
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java |  11 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |   6 +-
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |   3 +-
 .../SCMSecurityProtocolServerSideTranslatorPB.java |  28 ++
 ...inerLocationProtocolServerSideTranslatorPB.java |   9 +-
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |   2 +-
 .../safemode/OneReplicaPipelineSafeModeRule.java   |   2 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  12 +-
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |  20 +
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |  12 +-
 .../hadoop/hdds/scm/server/SCMPolicyProvider.java  |   5 +-
 .../hdds/scm/server/SCMSecurityProtocolServer.java |  16 +-
 .../hadoop/hdds/scm/server/SCMStorageConfig.java   |   2 +-
 .../hdds/scm/server/StorageContainerManager.java   |  51 +-
 .../org/apache/hadoop/hdds/scm/HddsTestUtils.java  |  14 +-
 .../hadoop/hdds/scm/TestHddsServerUtils.java       |   2 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java    |   7 +-
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java |   8 +-
 .../hadoop/hdds/scm/container/MockNodeManager.java |  21 +
 .../hdds/scm/container/SimpleMockNodeManager.java  |  20 +-
 .../container/TestCloseContainerEventHandler.java  |   8 +-
 .../scm/container/TestContainerManagerImpl.java    |  26 +-
 .../scm/container/TestContainerReportHandler.java  |  34 +-
 .../scm/container/TestContainerStateManager.java   |  14 +-
 .../TestIncrementalContainerReportHandler.java     |  30 +-
 .../hdds/scm/container/TestReplicationManager.java | 140 +++---
 .../scm/container/TestUnknownContainerReport.java  |   4 +-
 .../container/balancer/TestContainerBalancer.java  |  84 ++--
 .../states/TestContainerReplicaCount.java          |   5 +-
 .../hdds/scm/ha/TestReplicationAnnotation.java     |  10 +-
 .../hdds/scm/ha/TestSequenceIDGenerator.java       |   4 +-
 .../hdds/scm/node/TestContainerPlacement.java      |   6 +-
 .../hdds/scm/node/TestDatanodeAdminMonitor.java    |   7 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |   3 +-
 .../hadoop/hdds/scm/node/TestNodeStateManager.java |  24 +
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  12 +-
 .../hdds/scm/node/TestSCMNodeStorageStatMap.java   |   5 +-
 .../hdds/scm/pipeline/MockPipelineManager.java     |   4 +-
 .../TestPipelineDatanodesIntersection.java         |  13 +-
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java | 129 ++---
 .../scm/pipeline/TestPipelinePlacementPolicy.java  | 158 ++++++-
 .../scm/pipeline/TestPipelineStateManagerImpl.java |  62 +--
 .../hdds/scm/pipeline/TestPipelineStateMap.java    |  12 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    |  57 ++-
 .../scm/pipeline/TestSimplePipelineProvider.java   |  18 +-
 .../pipeline/TestWritableECContainerProvider.java  |  16 +-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |  20 +-
 .../TestOneReplicaPipelineSafeModeRule.java        |  10 +-
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |  22 +-
 .../scm/server/TestSCMBlockProtocolServer.java     |  10 +-
 .../server/TestSCMDatanodeHeartbeatDispatcher.java |  54 +--
 .../server/TestStorageContainerManagerStarter.java |   6 +-
 .../testutils/ReplicationNodeManagerMock.java      |  14 +
 hadoop-hdds/test-utils/pom.xml                     |   5 +
 .../org/apache/ozone/test/GenericTestUtils.java    |  18 +-
 .../java/org/apache/ozone/test/tag/Flaky.java}     |  36 +-
 .../main/java/org/apache/ozone/test/tag/Slow.java} |  35 +-
 .../org/apache/ozone/test/tag/package-info.java}   |  23 +-
 .../scm/cli/ContainerBalancerStartSubcommand.java  |   8 +-
 .../hdds/scm/cli/ContainerOperationClient.java     |   9 +-
 .../hadoop/hdds/scm/cli/cert/CertCommands.java     |   4 +-
 .../hdds/scm/cli/datanode/ListInfoSubcommand.java  |  38 +-
 .../hdds/scm/cli/container/TestInfoSubCommand.java |   6 +-
 .../datanode/TestContainerBalancerSubCommand.java  |   6 +-
 .../cli/pipeline/TestListPipelinesSubCommand.java  |  12 +-
 .../hadoop/ozone/client/OzoneClientFactory.java    |   5 +-
 .../client/checksum/BaseFileChecksumHelper.java    |  54 ++-
 .../checksum/ReplicatedFileChecksumHelper.java     |  44 +-
 .../client/io/BlockOutputStreamEntryPool.java      |   1 -
 .../hadoop/ozone/client/io/ECBlockInputStream.java |   7 +-
 .../client/io/MultipartCryptoKeyInputStream.java   |   4 +
 .../ozone/client/protocol/ClientProtocol.java      |   4 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  46 +-
 .../checksum/TestReplicatedFileChecksumHelper.java |  39 +-
 .../hadoop/ozone/client/rpc/RpcClientTest.java     | 279 +++++------
 .../hadoop/ozone/client/rpc/TestOzoneKMSUtil.java  |   3 +-
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   4 +
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |  15 +
 .../ozone/om/ha/OMFailoverProxyProvider.java       | 130 ++---
 .../apache/hadoop/ozone/om/helpers/DBUpdates.java  |  10 +
 .../hadoop/ozone/om/helpers/OmDirectoryInfo.java   |   7 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  16 +-
 .../ozone/om/helpers/OmKeyLocationInfoGroup.java   |   3 +-
 .../hadoop/ozone/om/helpers/OmVolumeArgs.java      |   4 +-
 .../hadoop/ozone/om/helpers/ServiceInfo.java       |  25 +-
 .../ozone/om/protocolPB/Hadoop3OmTransport.java    |   3 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  21 +-
 .../apache/hadoop/ozone/protocolPB/OMPBHelper.java |  37 +-
 .../apache/hadoop/ozone/security/acl/OzoneObj.java |   3 +-
 .../apache/hadoop/ozone/web/utils/OzoneUtils.java  |  18 +-
 .../org/apache/hadoop/ozone/TestOzoneAcls.java     |  10 +-
 .../ozone/om/ha/TestOMFailoverProxyProvider.java   |   8 +-
 .../hadoop/ozone/om/helpers/TestOmBucketInfo.java  |  18 +-
 .../hadoop/ozone/om/helpers/TestOmKeyInfo.java     |  24 +-
 .../ozone/om/helpers/TestOmMultipartKeyInfo.java   |   5 +-
 .../hadoop/ozone/om/helpers/TestQuotaUtil.java     |   5 +-
 .../ozone/security/acl/TestOzoneObjInfo.java       |   5 +-
 hadoop-ozone/csi/pom.xml                           |   4 +
 hadoop-ozone/datanode/pom.xml                      |   1 +
 hadoop-ozone/dev-support/checks/_lib.sh            |   2 +-
 .../dev-support/checks/_mvn_unit_report.sh         |  24 +-
 hadoop-ozone/dev-support/checks/acceptance.sh      |   3 +-
 hadoop-ozone/dev-support/checks/integration.sh     |   2 +-
 hadoop-ozone/dev-support/checks/rat.sh             |   4 +-
 hadoop-ozone/dist/pom.xml                          |   2 +-
 .../dist/src/main/compose/ozone-topology/test.sh   |   6 -
 .../dist/src/main/compose/ozone/docker-config      |   4 +
 .../dist/src/main/compose/ozone/prometheus.yml     |   1 +
 hadoop-ozone/dist/src/main/compose/ozone/test.sh   |   4 +
 .../src/main/compose/ozonesecure/docker-config     |   4 +
 .../dist/src/main/compose/ozonesecure/test.sh      |   2 +-
 hadoop-ozone/dist/src/main/compose/testlib.sh      |  98 +++-
 .../dist/src/main/compose/upgrade/testlib.sh       |   9 +-
 .../non-rolling-upgrade/1.1.0-1.2.0/callback.sh    |   6 +
 .../dist/src/main/compose/xcompat/clients.yaml     |  18 +
 hadoop-ozone/dist/src/main/compose/xcompat/test.sh |  17 +-
 hadoop-ozone/dist/src/main/k8s/README.md           |  68 +++
 .../k8s/definitions/ozone-csi/csi-controller.yaml  |   2 +-
 .../main/k8s/examples/getting-started/Flekszible   |   2 +
 .../examples/getting-started/config-configmap.yaml |   1 +
 .../kustomization.yaml}                            |  30 +-
 .../dist/src/main/k8s/examples/minikube/Flekszible |   2 +
 .../k8s/examples/minikube/config-configmap.yaml    |   1 +
 .../Flekszible => minikube/kustomization.yaml}     |  30 +-
 .../src/main/k8s/examples/ozone-dev/Flekszible     |   1 +
 .../k8s/examples/ozone-dev/config-configmap.yaml   |   1 +
 .../examples/ozone-dev/kustomization.yaml}         |  42 +-
 .../dist/src/main/k8s/examples/ozone-ha/Flekszible |   3 +
 .../ozone-ha/{Flekszible => kustomization.yaml}    |  26 +-
 .../dist/src/main/k8s/examples/ozone/Flekszible    |   3 +-
 .../main/k8s/examples/ozone/config-configmap.yaml  |   1 +
 .../Flekszible => ozone/kustomization.yaml}        |  26 +-
 hadoop-ozone/dist/src/main/license/jar-report.txt  |   1 +
 .../main/smoketest/auditparser/auditparser.robot   |  22 +-
 .../dist/src/main/smoketest/basic/basic.robot      |   9 +-
 .../debug/ozone-debug-corrupt-block.robot          |  49 ++
 .../ozone-debug-dead-datanode.robot}               |  35 +-
 .../debug/ozone-debug-stale-datanode.robot         |  48 ++
 .../main/smoketest/debug/ozone-debug-tests.robot   |  51 ++
 .../src/main/smoketest/debug/ozone-debug.robot     |  93 +++-
 .../dist/src/main/smoketest/freon/generate.robot   |  19 +-
 .../dist/src/main/smoketest/freon/remove.robot     |  21 +-
 .../dist/src/main/smoketest/freon/validate.robot   |  13 +-
 .../dist/src/main/smoketest/omha/om-prepare.robot  |   3 +-
 .../dist/src/main/smoketest/ozone-lib/freon.robot  |  65 +++
 .../dist/src/main/smoketest/recon/recon-api.robot  |   6 +-
 .../src/main/smoketest/s3/MultipartUpload.robot    |  21 +-
 .../dist/src/main/smoketest/s3/bucketlist.robot    |   7 +-
 .../dist/src/main/smoketest/s3/commonawslib.robot  |  36 ++
 .../dist/src/main/smoketest/spnego/web.robot       |   4 -
 .../dist/src/main/smoketest/upgrade/generate.robot |   5 +-
 .../fault-injection-test/mini-chaos-tests/pom.xml  |  25 +
 .../hadoop/ozone/TestAllMiniChaosOzoneCluster.java |   2 +-
 .../hadoop/ozone/insight/TestBaseInsightPoint.java |   7 +-
 hadoop-ozone/integration-test/pom.xml              |   5 +
 .../ozone/TestDirectoryDeletingServiceWithFSO.java |  26 +-
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |   4 +-
 .../fs/ozone/TestOzoneFileSystemWithLinks.java     |  14 +-
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java |   2 +-
 .../hadoop/hdds/scm/TestRatisPipelineLeader.java   |  23 +-
 .../hdds/scm/TestSCMDbCheckpointServlet.java       |   7 +-
 .../hadoop/hdds/scm/TestSCMInstallSnapshot.java    |  19 +-
 .../apache/hadoop/hdds/scm/TestSCMSnapshot.java    |   5 +-
 .../TestContainerStateManagerIntegration.java      |  44 +-
 .../metrics/TestSCMContainerManagerMetrics.java    |   4 +-
 .../hdds/scm/pipeline/TestLeaderChoosePolicy.java  |  28 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java    |   2 +-
 .../hadoop/hdds/scm/pipeline/TestNodeFailure.java  |   2 +-
 .../hdds/scm/pipeline/TestPipelineClose.java       |  29 +-
 .../TestRatisPipelineCreateAndDestroy.java         |  31 +-
 .../hadoop/hdds/scm/pipeline/TestSCMRestart.java   |   6 +-
 .../safemode/TestSCMSafeModeWithPipelineRules.java |   7 +-
 .../hadoop/hdds/upgrade/TestHDDSUpgrade.java       |  33 +-
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   8 +-
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  20 +-
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |   2 +-
 .../org/apache/hadoop/ozone/RatisTestHelper.java   |   5 +-
 ...OutputUtil.java => StandardOutputTestBase.java} |   2 +-
 .../hadoop/ozone/TestContainerOperations.java      |  35 +-
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |  36 +-
 .../hadoop/ozone/TestMiniOzoneOMHACluster.java     |   8 +-
 .../hadoop/ozone/TestOzoneConfigurationFields.java |   6 +-
 .../hadoop/ozone/TestSecureOzoneCluster.java       |  13 +-
 .../hadoop/ozone/TestStorageContainerManager.java  |   5 +-
 .../ozone/client/TestOzoneClientFactory.java       |  75 +++
 .../apache/hadoop/ozone/client/rpc/TestBCSID.java  |   7 +-
 .../rpc/TestBlockOutputStreamWithFailures.java     |  23 +-
 ...estBlockOutputStreamWithFailuresFlushDelay.java |  23 +-
 .../rpc/TestCloseContainerHandlingByClient.java    |  14 +-
 .../client/rpc/TestContainerStateMachine.java      |   4 +-
 .../TestContainerStateMachineFailureOnRead.java    |  21 +-
 .../rpc/TestContainerStateMachineFailures.java     |  13 +-
 .../rpc/TestContainerStateMachineFlushDelay.java   |  14 +-
 .../client/rpc/TestDeleteWithSlowFollower.java     |  10 +-
 .../ozone/client/rpc/TestECKeyOutputStream.java    |   8 +-
 .../client/rpc/TestFailureHandlingByClient.java    | 144 +++++-
 .../rpc/TestFailureHandlingByClientFlushDelay.java |   8 +-
 .../rpc/TestMultiBlockWritesWithDnFailures.java    |  10 +-
 .../client/rpc/TestOzoneAtRestEncryption.java      |  28 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java       |  18 +-
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  41 +-
 .../rpc/TestOzoneRpcClientForAclAuditLog.java      |   3 +-
 .../client/rpc/TestOzoneRpcClientWithRatis.java    |  10 +-
 .../ozone/client/rpc/TestSecureOzoneRpcClient.java |  20 +-
 .../ozone/client/rpc/TestWatchForCommit.java       |  12 +-
 .../rpc/read/TestBlockInputStreamFactoryImpl.java  |   2 +-
 .../ozone/client/rpc/read/TestInputStreamBase.java |   4 +-
 .../ozone/client/rpc/read/TestKeyInputStream.java  |   2 +-
 .../ozone/container/TestContainerReplication.java  |   2 +-
 .../apache/hadoop/ozone/container/TestHelper.java  |   8 +-
 .../commandhandler/TestBlockDeletion.java          |  21 +-
 .../TestCloseContainerByPipeline.java              |  24 +-
 .../commandhandler/TestCloseContainerHandler.java  |   2 +-
 .../commandhandler/TestDeleteContainerHandler.java |   2 +-
 ...ler.java => TestRefreshVolumeUsageHandler.java} | 121 +++--
 .../transport/server/ratis/TestCSMMetrics.java     |   3 +-
 .../container/ozoneimpl/TestOzoneContainer.java    | 154 +++---
 .../ozoneimpl/TestOzoneContainerWithTLS.java       |   6 +-
 .../container/server/TestContainerServer.java      |  23 +-
 .../server/TestSecureContainerServer.java          |  10 -
 .../hadoop/ozone/freon/TestRandomKeyGenerator.java |  15 +-
 .../hadoop/ozone/fsck/TestContainerMapper.java     |  16 +-
 .../hadoop/ozone/om/TestAddRemoveOzoneManager.java |  19 +-
 .../apache/hadoop/ozone/om/TestBucketOwner.java    |  27 +-
 .../ozone/om/TestContainerReportWithKeys.java      |   7 +-
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |  36 +-
 .../hadoop/ozone/om/TestOMDbCheckpointServlet.java |   2 -
 .../apache/hadoop/ozone/om/TestObjectStore.java    |  12 +-
 .../org/apache/hadoop/ozone/om/TestOmAcls.java     |  82 +++-
 .../hadoop/ozone/om/TestOmBlockVersioning.java     |  11 +-
 .../org/apache/hadoop/ozone/om/TestOmInit.java     |   7 +-
 .../org/apache/hadoop/ozone/om/TestOmLDBCli.java   |   5 +-
 .../org/apache/hadoop/ozone/om/TestOmMetrics.java  |   4 +-
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |  25 +-
 .../ozone/om/TestOzoneManagerHAKeyDeletion.java    |   2 +-
 .../ozone/om/TestOzoneManagerHAMetadataOnly.java   |  40 +-
 .../hadoop/ozone/om/TestOzoneManagerHAWithACL.java |   2 +-
 .../ozone/om/TestOzoneManagerHAWithData.java       |  62 +--
 .../ozone/om/TestOzoneManagerHAWithFailover.java   |   4 +-
 .../ozone/om/TestOzoneManagerListVolumes.java      |   2 -
 .../hadoop/ozone/om/TestOzoneManagerPrepare.java   |  14 +-
 .../hadoop/ozone/om/TestOzoneManagerRestart.java   |  49 +-
 .../apache/hadoop/ozone/om/TestScmSafeMode.java    |   2 +-
 .../hadoop/ozone/om/TestSecureOzoneManager.java    |   9 +-
 .../hadoop/ozone/recon/TestReconAsPassiveScm.java  |   8 +-
 .../hadoop/ozone/recon/TestReconScmSnapshot.java   |   2 +-
 .../apache/hadoop/ozone/recon/TestReconTasks.java  |   2 +-
 .../ozone/recon/TestReconWithOzoneManager.java     |   7 +-
 .../ozone/recon/TestReconWithOzoneManagerHA.java   |   2 +-
 .../hadoop/ozone/scm/TestFailoverWithSCMHA.java    |   5 +-
 .../TestSCMContainerPlacementPolicyMetrics.java    |  18 +-
 .../ozone/scm/TestSCMInstallSnapshotWithHA.java    |   4 +-
 .../org/apache/hadoop/ozone/scm/TestSCMMXBean.java |   4 +-
 .../ozone/scm/TestStorageContainerManagerHA.java   |   2 +-
 .../hadoop/ozone/scm/TestXceiverClientGrpc.java    |   2 +-
 .../scm/node/TestDecommissionAndMaintenance.java   |  27 +-
 .../ozone/scm/pipeline/TestSCMPipelineMetrics.java |   2 +-
 .../hadoop/ozone/shell/TestNSSummaryAdmin.java     |   4 +-
 hadoop-ozone/interface-client/pom.xml              |   4 +
 .../src/main/proto/OmClientProtocol.proto          |   3 +-
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   9 +-
 .../hadoop/ozone/om/codec/OmKeyInfoCodec.java      |   6 +-
 .../ozone/om/codec/RepeatedOmKeyInfoCodec.java     |   6 +-
 .../hadoop/ozone/om/codec/TestOmKeyInfoCodec.java  |   4 +-
 .../om/codec/TestOmMultipartKeyInfoCodec.java      |   3 +-
 .../ozone/om/codec/TestRepeatedOmKeyInfoCodec.java |   3 +-
 hadoop-ozone/ozone-manager/pom.xml                 |   5 +
 .../org/apache/hadoop/ozone/om/BucketManager.java  |  22 -
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  | 255 ----------
 .../hadoop/ozone/om/DirectoryDeletingService.java  |   7 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |   9 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 113 +++--
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  20 +
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  29 +-
 .../hadoop/ozone/om/OpenKeyCleanupService.java     |   3 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  10 +-
 .../apache/hadoop/ozone/om/OzoneManagerUtils.java  |  22 +
 .../hadoop/ozone/om/OzonePrefixPathImpl.java       |  20 +
 .../hadoop/ozone/om/TrashOzoneFileSystem.java      |   1 +
 .../hadoop/ozone/om/codec/OMDBDefinition.java      |  10 +-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |  15 +-
 .../hadoop/ozone/om/request/OMClientRequest.java   |  15 +-
 .../om/request/bucket/OMBucketCreateRequest.java   |  12 +-
 .../om/request/bucket/OMBucketDeleteRequest.java   |   9 +-
 .../om/request/bucket/acl/OMBucketAclRequest.java  |   4 +-
 .../file/OMDirectoryCreateRequestWithFSO.java      |   5 +-
 .../ozone/om/request/file/OMFileRequest.java       |   4 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  52 +-
 .../om/request/key/OMKeyCommitRequestWithFSO.java  |  24 +-
 .../om/request/key/OMKeyDeleteRequestWithFSO.java  |   2 +-
 .../om/request/key/OMKeyRenameRequestWithFSO.java  |   2 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |  40 +-
 .../om/request/key/OMOpenKeysDeleteRequest.java    |  42 +-
 .../ozone/om/request/key/acl/OMKeyAclRequest.java  |   2 +-
 .../om/request/key/acl/OMKeyAclRequestWithFSO.java |   2 +-
 .../S3MultipartUploadCompleteRequest.java          |  64 ++-
 .../S3MultipartUploadCompleteRequestWithFSO.java   |  12 +-
 .../security/OMCancelDelegationTokenRequest.java   |  42 +-
 .../security/OMGetDelegationTokenRequest.java      |  54 ++-
 .../security/OMRenewDelegationTokenRequest.java    |  51 +-
 .../validation/RequestFeatureValidator.java        |  99 ++++
 .../request/validation/RequestProcessingPhase.java |  36 +-
 .../om/request/validation/RequestValidations.java  | 107 +++++
 .../om/request/validation/ValidationCondition.java |  55 +++
 .../om/request/validation/ValidationContext.java   |  52 ++
 .../om/request/validation/ValidatorRegistry.java   | 201 ++++++++
 .../ozone/om/request/validation/package-info.java  |  62 +++
 .../om/response/bucket/OMBucketCreateResponse.java |   3 +-
 .../om/response/bucket/OMBucketDeleteResponse.java |   3 +-
 .../response/file/OMFileCreateResponseWithFSO.java |   4 +-
 .../om/response/key/OMAllocateBlockResponse.java   |   3 +-
 .../key/OMAllocateBlockResponseWithFSO.java        |   3 +-
 .../ozone/om/response/key/OMKeyCommitResponse.java |   4 +-
 .../response/key/OMKeyCommitResponseWithFSO.java   |   4 +-
 .../ozone/om/response/key/OMKeyCreateResponse.java |   3 +-
 .../response/key/OMKeyCreateResponseWithFSO.java   |   4 +-
 .../ozone/om/response/key/OMKeyDeleteResponse.java |   3 +-
 .../response/key/OMKeyDeleteResponseWithFSO.java   |   3 +-
 .../response/key/OMKeyRenameResponseWithFSO.java   |   3 +-
 .../om/response/key/OMKeysDeleteResponse.java      |   4 +-
 .../om/response/key/OMKeysRenameResponse.java      |  10 +
 .../om/response/key/OMOpenKeysDeleteResponse.java  |  18 +-
 .../multipart/S3MultipartUploadAbortResponse.java  |   9 +-
 .../S3MultipartUploadCommitPartResponse.java       |   9 +-
 .../S3MultipartUploadCompleteResponse.java         |  36 +-
 .../S3MultipartUploadCompleteResponseWithFSO.java  |   9 +-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |  25 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |   2 +
 .../org/apache/hadoop/ozone/om/OmTestManagers.java |  11 +
 .../ozone/om/ScmBlockLocationTestingClient.java    |   2 +-
 .../hadoop/ozone/om/TestBucketManagerImpl.java     | 243 +++++-----
 .../hadoop/ozone/om/TestKeyDeletingService.java    |   2 +-
 .../apache/hadoop/ozone/om/TestKeyManagerUnit.java |  14 +-
 .../hadoop/ozone/om/TestOmMetadataManager.java     |  50 +-
 .../hadoop/ozone/om/TestOzoneConfigUtil.java       |   2 +-
 .../hadoop/ozone/om/TestOzoneManagerStarter.java   |   4 +-
 .../apache/hadoop/ozone/om/TestTrashService.java   |   4 +-
 .../ozone/om/request/OMRequestTestUtils.java       |  16 +
 .../bucket/TestOMBucketCreateRequestWithFSO.java   |   4 +
 .../bucket/TestOMBucketDeleteRequestWithFSO.java   |  76 +++
 .../request/key/TestOMKeyDeleteRequestWithFSO.java |  97 +++-
 .../ozone/om/request/key/TestOMKeyRequest.java     |   2 +-
 .../request/key/TestOMOpenKeysDeleteRequest.java   | 174 ++++---
 .../TestS3MultipartUploadCompleteRequest.java      |  32 +-
 .../security/TestOMGetDelegationTokenRequest.java  |  10 +-
 .../TestRequestFeatureValidatorProcessor.java      | 524 +++++++++++++++++++++
 .../request/validation/TestRequestValidations.java | 349 ++++++++++++++
 .../request/validation/TestValidatorRegistry.java  | 215 +++++++++
 .../GeneralValidatorsForTesting.java               | 190 ++++++++
 .../ValidatorsForOnlyOldClientValidations.java     |  43 ++
 .../om/response/key/TestOMKeyDeleteResponse.java   |   3 +-
 .../response/key/TestOMOpenKeysDeleteResponse.java |  61 ++-
 .../s3/multipart/TestS3MultipartResponse.java      |  19 +-
 ...stS3MultipartUploadCompleteResponseWithFSO.java |  17 +-
 .../ozone/security/TestAWSV4AuthValidator.java     |   2 +-
 .../security/acl/TestOzoneNativeAuthorizer.java    |   8 +-
 .../hadoop/ozone/security/acl/TestParentAcl.java   |   2 +-
 .../hadoop/ozone/security/acl/TestVolumeOwner.java |   2 +-
 .../apache/hadoop/fs/ozone/OzoneClientUtils.java   |   2 +-
 .../hadoop/fs/ozone/TestOzoneClientUtils.java      |   4 +-
 hadoop-ozone/ozonefs-hadoop2/pom.xml               |   1 +
 .../hadoop/fs/ozone/Hadoop27RpcTransport.java      |   3 +-
 hadoop-ozone/ozonefs-hadoop3/pom.xml               |   1 +
 hadoop-ozone/ozonefs-shaded/pom.xml                |   5 +
 hadoop-ozone/recon-codegen/pom.xml                 |   4 +
 .../hadoop/ozone/recon/ReconControllerModule.java  |   2 +
 .../org/apache/hadoop/ozone/recon/ReconServer.java | 125 +++++
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  |  29 ++
 .../ozone/recon/api/ClusterStateEndpoint.java      |  15 +-
 .../recon/metrics/OzoneManagerSyncMetrics.java     |  12 +
 .../recon/metrics/ReconTaskStatusMetrics.java      |  83 ++++
 .../hadoop/ozone/recon/scm/ReconNodeManager.java   |  18 +-
 .../ozone/recon/scm/ReconPipelineManager.java      |   4 +-
 .../hadoop/ozone/recon/scm/ReconStorageConfig.java |  61 ++-
 .../scm/ReconStorageContainerManagerFacade.java    |  11 +-
 .../spi/impl/OzoneManagerServiceProviderImpl.java  |  14 +-
 .../impl/StorageContainerServiceProviderImpl.java  |  20 +-
 .../ozone/recon/OMMetadataManagerTestUtils.java    |  12 +-
 .../ozone/recon/api/TestContainerEndpoint.java     |   2 +-
 .../hadoop/ozone/recon/api/TestEndpoints.java      |   7 +-
 .../ozone/recon/api/TestOpenContainerCount.java    |  16 +-
 .../recon/fsck/TestContainerHealthStatus.java      |   4 +-
 .../ozone/recon/fsck/TestContainerHealthTask.java  |   6 +-
 .../TestContainerHealthTaskRecordGenerator.java    |   3 +-
 .../recovery/TestReconOmMetadataManagerImpl.java   |   4 +-
 .../scm/AbstractReconContainerManagerTest.java     |  20 +-
 .../ozone/recon/scm/TestReconContainerManager.java |   2 +-
 .../ozone/recon/scm/TestReconNodeManager.java      |  12 +-
 .../ozone/recon/scm/TestReconPipelineManager.java  |  15 +-
 .../impl/TestReconNamespaceSummaryManagerImpl.java |   6 +-
 .../recon/tasks/TestContainerKeyMapperTask.java    |   4 +-
 .../ozone/recon/tasks/TestNSSummaryTask.java       |   4 +-
 .../ozone/recon/tasks/TestOMDBUpdatesHandler.java  |   4 +-
 .../java/org/apache/hadoop/ozone/s3/Gateway.java   |   9 +-
 .../hadoop/ozone/s3/OzoneClientProducer.java       |  24 +-
 .../hadoop/ozone/s3/endpoint/BucketEndpoint.java   |  13 +
 .../hadoop/ozone/s3/endpoint/EndpointBase.java     |  12 +-
 .../ozone/s3/endpoint/ListBucketResponse.java      |   6 +-
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   |  34 +-
 .../hadoop/ozone/s3/endpoint/RootEndpoint.java     |   9 +-
 .../hadoop/ozone/s3/exception/S3ErrorTable.java    |   9 +
 .../hadoop/ozone/s3/metrics/S3GatewayMetrics.java  | 320 +++++++++++++
 .../hadoop/ozone/s3/metrics/package-info.java      |  23 +-
 .../hadoop/ozone/client/OzoneBucketStub.java       |   2 +-
 .../ozone/s3/metrics/TestS3GatewayMetrics.java     | 113 +++++
 .../apache/hadoop/ozone/s3/util/TestS3Utils.java   |   4 +-
 .../apache/hadoop/ozone/debug/ChunkKeyHandler.java |   3 +-
 .../apache/hadoop/ozone/debug/ReadReplicas.java    |   5 +-
 .../ozone/freon/LeaderAppendLogEntryGenerator.java |   2 +-
 .../apache/hadoop/ozone/freon/OmKeyGenerator.java  |   2 +-
 .../hadoop/ozone/freon/SCMThroughputBenchmark.java |   2 +-
 .../freon/containergenerator/GeneratorOm.java      |   5 +-
 .../freon/containergenerator/GeneratorScm.java     |   4 +-
 .../apache/hadoop/ozone/fsck/ContainerMapper.java  |   5 +-
 .../apache/hadoop/ozone/freon/TestProgressBar.java |   6 +-
 pom.xml                                            |  85 +++-
 580 files changed, 11454 insertions(+), 4601 deletions(-)

diff --cc hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 2e49f02,a6f22d4..07a444a
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@@ -212,10 -219,9 +212,10 @@@ public class BlockInputStream extends B
    protected List<ChunkInfo> getChunkInfos() throws IOException {
      // irrespective of the container state, we will always read via Standalone
      // protocol.
 -    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
 +    if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE && pipeline
 +        .getType() != HddsProtos.ReplicationType.EC) {
        pipeline = Pipeline.newBuilder(pipeline)
-           .setReplicationConfig(new StandaloneReplicationConfig(
+           .setReplicationConfig(StandaloneReplicationConfig.getInstance(
                ReplicationConfig
                    .getLegacyFactor(pipeline.getReplicationConfig())))
            .build();
diff --cc hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
index 1e2264b,0071924..e99b6a5
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
@@@ -25,7 -25,7 +25,8 @@@ import java.time.Instant
  import java.util.Arrays;
  import java.util.Comparator;
  
+ import com.fasterxml.jackson.annotation.JsonProperty;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
  import org.apache.hadoop.hdds.client.ReplicationConfig;
  import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
  import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
diff --cc hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 51c0446,fa87ab2..4b7fda9
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@@ -466,19 -449,12 +454,19 @@@ public final class OzoneConfigKeys 
    public static final String OZONE_CLIENT_TEST_OFS_BUCKET_LAYOUT_DEFAULT =
        "FILE_SYSTEM_OPTIMIZED";
  
-   public static final String OZONE_OM_CLIENT_PROTOCOL_VERSION_KEY =
-       "ozone.om.client.protocol.version";
-   // The version of the protocol for Client (S3G/OFS) to OM Communication.
-   // The protocol starts at 2.0.0 and a null or empty value for older versions.
-   public static final String OZONE_OM_CLIENT_PROTOCOL_VERSION = "2.0.0";
+   public static final String OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY =
+       "ozone.client.required.om.version.min";
+ 
+   public static final String OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT =
+       OzoneManagerVersion.S3G_PERSISTENT_CONNECTIONS.name();
  
 +  public static final String
 +      OZONE_CLIENT_BUCKET_REPLICATION_CONFIG_REFRESH_PERIOD_MS =
 +      "ozone.client.bucket.replication.config.refresh.time.ms";
 +  public static final long
 +      OZONE_CLIENT_BUCKET_REPLICATION_CONFIG_REFRESH_PERIOD_DEFAULT_MS =
 +      300 * 1000;
 +
    /**
     * There is no need to instantiate this class.
     */
diff --cc hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
index 4d874da,0000000..9ec19bf
mode 100644,000000..100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
@@@ -1,86 -1,0 +1,86 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + *  with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *  Unless required by applicable law or agreed to in writing, software
 + *  distributed under the License is distributed on an "AS IS" BASIS,
 + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  See the License for the specific language governing permissions and
 + *  limitations under the License.
 + */
 +package org.apache.hadoop.hdds.scm.container;
 +
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import java.io.IOException;
 +
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
 +
 +/**
 + * Tests for the ContainerInfo class.
 + */
 +
 +public class TestContainerInfo {
 +
 +  @Test
 +  public void getProtobufMessageEC() throws IOException {
 +    ContainerInfo container =
-         createContainerInfo(new RatisReplicationConfig(THREE));
++        createContainerInfo(RatisReplicationConfig.getInstance(THREE));
 +    HddsProtos.ContainerInfoProto proto = container.getProtobuf();
 +
 +    // No EC Config
 +    Assert.assertFalse(proto.hasEcReplicationConfig());
 +    Assert.assertEquals(THREE, proto.getReplicationFactor());
 +    Assert.assertEquals(RATIS, proto.getReplicationType());
 +
 +    // Reconstruct object from Proto
 +    ContainerInfo recovered = ContainerInfo.fromProtobuf(proto);
 +    Assert.assertEquals(RATIS, recovered.getReplicationType());
 +    Assert.assertTrue(
 +        recovered.getReplicationConfig() instanceof RatisReplicationConfig);
 +
 +    // EC Config
 +    container = createContainerInfo(new ECReplicationConfig(3, 2));
 +    proto = container.getProtobuf();
 +
 +    Assert.assertEquals(3, proto.getEcReplicationConfig().getData());
 +    Assert.assertEquals(2, proto.getEcReplicationConfig().getParity());
 +    Assert.assertFalse(proto.hasReplicationFactor());
 +    Assert.assertEquals(EC, proto.getReplicationType());
 +
 +    // Reconstruct object from Proto
 +    recovered = ContainerInfo.fromProtobuf(proto);
 +    Assert.assertEquals(EC, recovered.getReplicationType());
 +    Assert.assertTrue(
 +        recovered.getReplicationConfig() instanceof ECReplicationConfig);
 +    ECReplicationConfig config =
 +        (ECReplicationConfig)recovered.getReplicationConfig();
 +    Assert.assertEquals(3, config.getData());
 +    Assert.assertEquals(2, config.getParity());
 +  }
 +
 +  private ContainerInfo createContainerInfo(ReplicationConfig repConfig) {
 +    ContainerInfo.Builder builder = new ContainerInfo.Builder();
 +    builder.setContainerID(1234)
 +        .setReplicationConfig(repConfig)
 +        .setPipelineID(PipelineID.randomId())
 +        .setState(HddsProtos.LifeCycleState.OPEN)
 +        .setOwner("scm");
 +    return builder.build();
 +  }
 +}
diff --cc hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 72e9b92,ba131ff..fdf44ed
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@@ -681,10 -751,9 +751,10 @@@ public class KeyValueContainer implemen
          .setWriteCount(containerData.getWriteCount())
          .setReadBytes(containerData.getReadBytes())
          .setWriteBytes(containerData.getWriteBytes())
-         .setKeyCount(containerData.getKeyCount())
+         .setKeyCount(containerData.getBlockCount())
          .setUsed(containerData.getBytesUsed())
          .setState(getHddsState())
 +        .setReplicaIndex(containerData.getReplicaIndex())
          .setDeleteTransactionId(containerData.getDeleteTransactionId())
          .setBlockCommitSequenceId(containerData.getBlockCommitSequenceId())
          .setOriginNodeId(containerData.getOriginNodeId());
diff --cc hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 0ba9f3b,e1c19f7..a14be22
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@@ -115,9 -113,6 +116,8 @@@ import java.util.Map
  import java.util.Optional;
  import java.util.function.Consumer;
  
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
- import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
 +
  /**
   * This class is the client-side translator to translate the requests made on
   * the {@link StorageContainerLocationProtocol} interface to the RPC server
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index b46bc79,9b94e20..dd9f71a
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@@ -110,11 -129,10 +130,9 @@@ public class AbstractContainerReportHan
     * @throws ContainerNotFoundException If the container is not present
     */
    private void updateContainerStats(final DatanodeDetails datanodeDetails,
-                                     final ContainerID containerId,
+                                     final ContainerInfo containerInfo,
                                      final ContainerReplicaProto replicaProto)
        throws ContainerNotFoundException {
-     final ContainerInfo containerInfo = containerManager
-         .getContainer(containerId);
 -    final ContainerID containerId = containerInfo.containerID();
  
      if (isHealthy(replicaProto::getState)) {
        if (containerInfo.getSequenceId() <
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index e62a8b4,1c39efd..e4076d9
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@@ -255,19 -270,13 +272,19 @@@ public class ContainerManagerImpl imple
          .setOwner(owner)
          .setContainerID(containerID.getId())
          .setDeleteTransactionId(0)
 -        .setReplicationFactor(
 -            ReplicationConfig.getLegacyFactor(pipeline.getReplicationConfig()))
 -        .setReplicationType(pipeline.getType())
 -        .build();
 -    containerStateManager.addContainer(containerInfo);
 +        .setReplicationType(pipeline.getType());
 +
 +    if (pipeline.getReplicationConfig() instanceof ECReplicationConfig) {
 +      containerInfoBuilder.setEcReplicationConfig(
 +          ((ECReplicationConfig) pipeline.getReplicationConfig()).toProto());
 +    } else {
 +      containerInfoBuilder.setReplicationFactor(
 +          ReplicationConfig.getLegacyFactor(pipeline.getReplicationConfig()));
 +    }
 +
 +    containerStateManager.addContainer(containerInfoBuilder.build());
      scmContainerManagerMetrics.incNumSuccessfulCreateContainers();
-     return containerStateManager.getContainer(containerID.getProtobuf());
+     return containerStateManager.getContainer(containerID);
    }
  
    @Override
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index bba6edb,2168138..19b7a51
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@@ -177,10 -167,9 +177,10 @@@ public class PipelineManagerImpl implem
  
      acquireWriteLock();
      try {
 -      Pipeline pipeline = pipelineFactory.create(replicationConfig);
 +      Pipeline pipeline = pipelineFactory.create(replicationConfig,
 +          excludedNodes, favoredNodes);
        stateManager.addPipeline(pipeline.getProtobufMessage(
-           ClientVersions.CURRENT_VERSION));
+           ClientVersion.CURRENT_VERSION));
        recordMetricsForPipeline(pipeline);
        return pipeline;
      } catch (IOException ex) {
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
index 470136a,d1e97fe..ee0b637
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
@@@ -155,14 -154,19 +155,28 @@@ public class TestContainerManagerImpl 
          containerManager.getContainers(HddsProtos.LifeCycleState.OPEN).size());
      Assert.assertEquals(2, containerManager
          .getContainers(HddsProtos.LifeCycleState.CLOSING).size());
+     containerManager.updateContainerState(cidArray[1],
+         HddsProtos.LifeCycleEvent.QUASI_CLOSE);
+     containerManager.updateContainerState(cidArray[2],
+         HddsProtos.LifeCycleEvent.FINALIZE);
+     containerManager.updateContainerState(cidArray[2],
+         HddsProtos.LifeCycleEvent.CLOSE);
+     Assert.assertEquals(7, containerManager.
+         getContainerStateCount(HddsProtos.LifeCycleState.OPEN));
+     Assert.assertEquals(1, containerManager
+         .getContainerStateCount(HddsProtos.LifeCycleState.CLOSING));
+     Assert.assertEquals(1, containerManager
+         .getContainerStateCount(HddsProtos.LifeCycleState.QUASI_CLOSED));
+     Assert.assertEquals(1, containerManager
+         .getContainerStateCount(HddsProtos.LifeCycleState.CLOSED));
    }
 +
 +  @Test
 +  public void testAllocateContainersWithECReplicationConfig() throws Exception {
 +    final ContainerInfo admin = containerManager
 +        .allocateContainer(new ECReplicationConfig(3, 2), "admin");
 +    Assert.assertEquals(1, containerManager.getContainers().size());
 +    Assert.assertNotNull(containerManager.getContainer(admin.containerID()));
 +  }
 +
  }
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 00d96dc,c82006e..523efc0
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@@ -28,13 -26,10 +28,13 @@@ import org.apache.hadoop.hdds.scm.metad
  import org.apache.hadoop.hdds.scm.node.NodeManager;
  import org.apache.hadoop.hdds.utils.db.DBStore;
  import org.apache.hadoop.hdds.utils.db.Table;
- import org.apache.hadoop.ozone.ClientVersions;
+ import org.apache.hadoop.ozone.ClientVersion;
  
  import java.io.IOException;
 +import java.util.ArrayList;
  import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
  import java.util.List;
  import java.util.Map;
  import java.util.NavigableSet;
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 3152c39,3a68db1..fd29929
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@@ -224,19 -220,15 +226,21 @@@ public class TestPipelineManagerImpl 
      Assert.assertEquals(Pipeline.PipelineState.DORMANT,
          pipelineStore.get(pipeline.getId()).getPipelineState());
      Assert.assertFalse(pipelineManager
-         .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
+         .getPipelines(RatisReplicationConfig
+             .getInstance(ReplicationFactor.THREE),
              Pipeline.PipelineState.OPEN).contains(pipeline));
-     Assert.assertEquals(1, pipelineManager
-         .getPipelineCount(new RatisReplicationConfig(ReplicationFactor.THREE),
++    Assert.assertEquals(1, pipelineManager.getPipelineCount(
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
 +            Pipeline.PipelineState.DORMANT));
  
      pipelineManager.activatePipeline(pipeline.getId());
      Assert.assertTrue(pipelineManager
-         .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
+         .getPipelines(RatisReplicationConfig
+             .getInstance(ReplicationFactor.THREE),
              Pipeline.PipelineState.OPEN).contains(pipeline));
-     Assert.assertEquals(1, pipelineManager
-         .getPipelineCount(new RatisReplicationConfig(ReplicationFactor.THREE),
++    Assert.assertEquals(1, pipelineManager.getPipelineCount(
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
 +            Pipeline.PipelineState.OPEN));
      buffer.flush();
      Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
      pipelineManager.close();
@@@ -715,43 -724,6 +736,43 @@@
      assertTrue(containerLogIdx < pipelineLogIdx);
    }
  
 +  @Test
 +  public void testCreatePipelineForRead() throws IOException {
 +    PipelineManager pipelineManager = createPipelineManager(true);
 +    List<DatanodeDetails> dns = nodeManager
 +        .getNodes(NodeStatus.inServiceHealthy())
 +        .stream()
 +        .limit(3)
 +        .collect(Collectors.toList());
 +    Set<ContainerReplica> replicas = createContainerReplicasList(dns);
 +    Pipeline pipeline = pipelineManager.createPipelineForRead(
-         new RatisReplicationConfig(ReplicationFactor.THREE), replicas);
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE), replicas);
 +    Assert.assertEquals(3, pipeline.getNodes().size());
 +    for (DatanodeDetails dn : pipeline.getNodes())  {
 +      Assert.assertTrue(dns.contains(dn));
 +    }
 +  }
 +
 +  private Set<ContainerReplica> createContainerReplicasList(
 +      List <DatanodeDetails> dns) {
 +    Set<ContainerReplica> replicas = new HashSet<>();
 +    for (DatanodeDetails dn : dns) {
 +      ContainerReplica r = ContainerReplica.newBuilder()
 +          .setBytesUsed(1)
 +          .setContainerID(ContainerID.valueOf(1))
 +          .setContainerState(StorageContainerDatanodeProtocolProtos
 +              .ContainerReplicaProto.State.CLOSED)
 +          .setKeyCount(1)
 +          .setOriginNodeId(UUID.randomUUID())
 +          .setSequenceId(1)
 +          .setReplicaIndex(0)
 +          .setDatanodeDetails(dn)
 +          .build();
 +      replicas.add(r);
 +    }
 +    return replicas;
 +  }
 +
    private void sendPipelineReport(
        DatanodeDetails dn, Pipeline pipeline,
        PipelineReportHandler pipelineReportHandler,
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
index 11ae86c,0000000..8168ce8
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
@@@ -1,95 -1,0 +1,99 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.hdds.scm.pipeline;
 +
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 +import org.junit.After;
 +import org.junit.Before;
 +import org.junit.Test;
 +
 +import java.io.IOException;
 +
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 +import static org.junit.Assert.assertEquals;
 +
 +/**
 + * Tests for PipelineStateMap.
 + */
 +
 +public class TestPipelineStateMap {
 +
 +  private PipelineStateMap map;
 +
 +  @Before
 +  public void setup() {
 +    map = new PipelineStateMap();
 +  }
 +
 +  @After
 +  public void teardown() throws IOException {
 +  }
 +
 +  @Test
 +  public void testCountPipelines() throws IOException {
 +    Pipeline p;
 +
 +    // Open Stanadlone Pipelines
 +    map.addPipeline(MockPipeline.createPipeline(1));
 +    map.addPipeline(MockPipeline.createPipeline(1));
 +    p = MockPipeline.createPipeline(1);
 +    map.addPipeline(p);
 +    map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
 +
 +    // Ratis pipeline
 +    map.addPipeline(MockPipeline.createRatisPipeline());
 +    p = MockPipeline.createRatisPipeline();
 +    map.addPipeline(p);
 +    map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
 +
 +    // EC Pipelines
 +    map.addPipeline(MockPipeline.createEcPipeline(
 +        new ECReplicationConfig(3, 2)));
 +    map.addPipeline(MockPipeline.createEcPipeline(
 +        new ECReplicationConfig(3, 2)));
 +    p = MockPipeline.createEcPipeline(new ECReplicationConfig(3, 2));
 +    map.addPipeline(p);
 +    map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
 +
-     assertEquals(2, map.getPipelineCount(new StandaloneReplicationConfig(ONE),
++    assertEquals(2, map.getPipelineCount(
++        StandaloneReplicationConfig.getInstance(ONE),
 +        Pipeline.PipelineState.OPEN));
-     assertEquals(1, map.getPipelineCount(new RatisReplicationConfig(THREE),
++    assertEquals(1, map.getPipelineCount(
++        RatisReplicationConfig.getInstance(THREE),
 +        Pipeline.PipelineState.OPEN));
 +    assertEquals(2, map.getPipelineCount(new ECReplicationConfig(3, 2),
 +        Pipeline.PipelineState.OPEN));
 +
 +    assertEquals(0, map.getPipelineCount(new ECReplicationConfig(6, 3),
 +        Pipeline.PipelineState.OPEN));
 +
-     assertEquals(1, map.getPipelineCount(new StandaloneReplicationConfig(ONE),
++    assertEquals(1, map.getPipelineCount(
++        StandaloneReplicationConfig.getInstance(ONE),
 +        Pipeline.PipelineState.CLOSED));
-     assertEquals(1, map.getPipelineCount(new RatisReplicationConfig(THREE),
++    assertEquals(1, map.getPipelineCount(
++        RatisReplicationConfig.getInstance(THREE),
 +        Pipeline.PipelineState.CLOSED));
 +    assertEquals(1, map.getPipelineCount(new ECReplicationConfig(3, 2),
 +        Pipeline.PipelineState.CLOSED));
 +  }
 +
 +
 +}
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index fa8da28,d2b19fb..c0ce8d4
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@@ -220,17 -219,15 +224,20 @@@ public class TestRatisPipelineProvider 
      List<DatanodeDetails> healthyNodes = nodeManager
          .getNodes(NodeStatus.inServiceHealthy()).stream()
          .limit(3).collect(Collectors.toList());
 +    Set<ContainerReplica> replicas = createContainerReplicas(healthyNodes);
  
      Pipeline pipeline1 = provider.create(
-         new RatisReplicationConfig(ReplicationFactor.THREE), healthyNodes);
+         RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+         healthyNodes);
      Pipeline pipeline2 = provider.create(
-         new RatisReplicationConfig(ReplicationFactor.THREE), healthyNodes);
+         RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+         healthyNodes);
 +    Pipeline pipeline3 = provider.createForRead(
-         new RatisReplicationConfig(ReplicationFactor.THREE), replicas);
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
++        replicas);
  
      Assert.assertEquals(pipeline1.getNodeSet(), pipeline2.getNodeSet());
 +    Assert.assertEquals(pipeline2.getNodeSet(), pipeline3.getNodeSet());
      cleanup();
    }
  
@@@ -284,31 -281,6 +291,31 @@@
    }
  
    @Test
 +  // Test excluded nodes work correctly. Note that for Ratis, the
 +  // PipelinePlacementPolicy, which Ratis is hardcoded to use, does not consider
 +  // favored nodes.
 +  public void testCreateFactorTHREEPipelineWithExcludedDatanodes()
 +      throws Exception {
 +    init(1);
 +    int healthyCount = nodeManager.getNodes(NodeStatus.inServiceHealthy())
 +        .size();
 +    // Add all but 3 nodes to the exclude list and ensure that the 3 picked
 +    // nodes are not in the excluded list.
 +    List<DatanodeDetails> excludedNodes = nodeManager
 +        .getNodes(NodeStatus.inServiceHealthy()).stream()
 +        .limit(healthyCount - 3).collect(Collectors.toList());
 +
 +    Pipeline pipeline1 = provider.create(
-         new RatisReplicationConfig(ReplicationFactor.THREE), excludedNodes,
-         Collections.EMPTY_LIST);
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
++        excludedNodes, Collections.EMPTY_LIST);
 +
 +    for (DatanodeDetails dn : pipeline1.getNodes()) {
 +      assertFalse(excludedNodes.contains(dn));
 +    }
 +  }
 +
 +
 +  @Test
    public void testCreatePipelinesWhenNotEnoughSpace() throws Exception {
      String expectedErrorSubstring = "Unable to find enough" +
          " nodes that meet the space requirement";
@@@ -319,11 -291,8 +326,11 @@@
      largeContainerConf.set(OZONE_SCM_CONTAINER_SIZE, "100TB");
      init(1, largeContainerConf);
      for (ReplicationFactor factor: ReplicationFactor.values()) {
 +      if (factor == ReplicationFactor.ZERO) {
 +        continue;
 +      }
        try {
-         provider.create(new RatisReplicationConfig(factor));
+         provider.create(RatisReplicationConfig.getInstance(factor));
          Assert.fail("Expected SCMException for large container size with " +
              "replication factor " + factor.toString());
        } catch (SCMException ex) {
@@@ -335,11 -304,8 +342,11 @@@
      largeMetadataConf.set(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN, "100TB");
      init(1, largeMetadataConf);
      for (ReplicationFactor factor: ReplicationFactor.values()) {
 +      if (factor == ReplicationFactor.ZERO) {
 +        continue;
 +      }
        try {
-         provider.create(new RatisReplicationConfig(factor));
+         provider.create(RatisReplicationConfig.getInstance(factor));
          Assert.fail("Expected SCMException for large metadata size with " +
              "replication factor " + factor.toString());
        } catch (SCMException ex) {
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
index b3ce216,0000000..ae997a3
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@@ -1,446 -1,0 +1,446 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership.  The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +package org.apache.hadoop.hdds.scm.pipeline;
 +
 +import org.apache.hadoop.hdds.HddsConfigKeys;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 +import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
 +import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 +import org.apache.hadoop.hdds.scm.container.ContainerID;
 +import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 +import org.apache.hadoop.hdds.scm.container.ContainerManager;
 +import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
 +import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 +import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
 +import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
- import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 +import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
++import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 +import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 +import org.apache.hadoop.hdds.scm.node.NodeManager;
 +import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
 +import org.apache.hadoop.hdds.utils.db.DBStore;
 +import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
 +import org.apache.ozone.test.GenericTestUtils;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.mockito.Matchers;
 +import org.mockito.Mockito;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.NavigableSet;
 +import java.util.Set;
 +import java.util.UUID;
 +
 +import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
 +import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
 +import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
 +import static org.junit.Assert.assertEquals;
 +import static org.junit.Assert.assertFalse;
 +import static org.junit.Assert.assertNotEquals;
 +import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertNull;
 +import static org.junit.Assert.assertTrue;
 +import static org.mockito.Mockito.verify;
 +
 +/**
 + * Tests to validate the WritableECContainerProvider works correctly.
 + */
 +public class TestWritableECContainerProvider {
 +
 +  private static final Logger LOG = LoggerFactory
 +      .getLogger(TestWritableECContainerProvider.class);
 +  private static final String OWNER = "SCM";
 +  private PipelineManager pipelineManager;
 +  private ContainerManager containerManager
 +      = Mockito.mock(ContainerManager.class);
 +  private PipelineChoosePolicy pipelineChoosingPolicy
 +      = new HealthyPipelineChoosePolicy();
 +
 +  private OzoneConfiguration conf;
 +  private DBStore dbStore;
 +  private SCMHAManager scmhaManager;
 +  private NodeManager nodeManager;
 +  private WritableContainerProvider provider;
 +  private ReplicationConfig repConfig;
 +  private int minPipelines;
 +
 +  private Map<ContainerID, ContainerInfo> containers;
 +
 +  @Before
 +  public void setup() throws IOException {
 +    repConfig = new ECReplicationConfig(3, 2);
 +    conf = new OzoneConfiguration();
 +    WritableECContainerProvider.WritableECContainerProviderConfig providerConf =
 +        conf.getObject(WritableECContainerProvider
 +            .WritableECContainerProviderConfig.class);
 +    minPipelines = providerConf.getMinimumPipelines();
 +    containers = new HashMap<>();
 +    File testDir = GenericTestUtils.getTestDir(
 +        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
 +    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
 +    dbStore = DBStoreBuilder.createDBStore(
 +        conf, new SCMDBDefinition());
-     scmhaManager = MockSCMHAManager.getInstance(true);
++    scmhaManager = SCMHAManagerStub.getInstance(true);
 +    nodeManager = new MockNodeManager(true, 10);
 +    pipelineManager =
 +        new MockPipelineManager(dbStore, scmhaManager, nodeManager);
 +    provider = new WritableECContainerProvider(
 +        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
 +
 +    Mockito.doAnswer(call -> {
 +      Pipeline pipeline = (Pipeline)call.getArguments()[2];
 +      ContainerInfo container = createContainer(pipeline,
 +          repConfig, System.nanoTime());
 +      pipelineManager.addContainerToPipeline(
 +          pipeline.getId(), container.containerID());
 +      containers.put(container.containerID(), container);
 +      return container;
 +    }).when(containerManager).getMatchingContainer(Matchers.anyLong(),
 +        Matchers.anyString(), Matchers.any(Pipeline.class));
 +
 +    Mockito.doAnswer(call ->
 +        containers.get((ContainerID)call.getArguments()[0]))
 +        .when(containerManager).getContainer(Matchers.any(ContainerID.class));
 +
 +  }
 +
 +  @Test
 +  public void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
 +      throws IOException {
 +    // The first 5 calls should return a different container
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container =
 +          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +      assertFalse(allocatedContainers.contains(container));
 +      allocatedContainers.add(container);
 +    }
 +
 +    allocatedContainers.clear();
 +    for (int i = 0; i < 20; i++) {
 +      ContainerInfo container =
 +          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +      allocatedContainers.add(container);
 +    }
 +    // Should have minPipelines containers created
 +    assertEquals(minPipelines,
 +        pipelineManager.getPipelines(repConfig, OPEN).size());
 +    // We should have more than 1 allocatedContainers in the set proving a
 +    // random container is selected each time. Do not check for 5 here as there
 +    // is a reasonable chance that in 20 turns we don't pick all 5 nodes.
 +    assertTrue(allocatedContainers.size() > 2);
 +  }
 +
 +  @Test
 +  public void testPiplineLimitIgnoresExcludedPipelines() throws IOException {
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container = provider.getContainer(
 +          1, repConfig, OWNER, new ExcludeList());
 +      allocatedContainers.add(container);
 +    }
 +    // We have the min limit of pipelines, but then exclude one. It should use
 +    // one of the existing rather than createing a new one, as the limit is
 +    // checked against all pipelines, not just the filtered list
 +    ExcludeList exclude = new ExcludeList();
 +    PipelineID excludedID = allocatedContainers
 +        .stream().findFirst().get().getPipelineID();
 +    exclude.addPipeline(excludedID);
 +
 +    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
 +    assertNotEquals(excludedID, c.getPipelineID());
 +    assertTrue(allocatedContainers.contains(c));
 +  }
 +
 +  @Test
 +  public void testNewPipelineCreatedIfAllPipelinesExcluded()
 +      throws IOException {
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container = provider.getContainer(
 +          1, repConfig, OWNER, new ExcludeList());
 +      allocatedContainers.add(container);
 +    }
 +    // We have the min limit of pipelines, but then exclude one. It should use
 +    // one of the existing rather than creating a new one, as the limit is
 +    // checked against all pipelines, not just the filtered list
 +    ExcludeList exclude = new ExcludeList();
 +    for (ContainerInfo c : allocatedContainers) {
 +      exclude.addPipeline(c.getPipelineID());
 +    }
 +    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
 +    assertFalse(allocatedContainers.contains(c));
 +  }
 +
 +  @Test
 +  public void testNewPipelineCreatedIfAllContainersExcluded()
 +      throws IOException {
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container = provider.getContainer(
 +          1, repConfig, OWNER, new ExcludeList());
 +      allocatedContainers.add(container);
 +    }
 +    // We have the min limit of pipelines, but then exclude one. It should use
 +    // one of the existing rather than createing a new one, as the limit is
 +    // checked against all pipelines, not just the filtered list
 +    ExcludeList exclude = new ExcludeList();
 +    for (ContainerInfo c : allocatedContainers) {
 +      exclude.addConatinerId(c.containerID());
 +    }
 +    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
 +    assertFalse(allocatedContainers.contains(c));
 +  }
 +
 +  @Test
 +  public void testUnableToCreateAnyPipelinesReturnsNull() throws IOException {
-     pipelineManager =
-         new MockPipelineManager(dbStore, scmhaManager, nodeManager) {
++    pipelineManager = new MockPipelineManager(
++        dbStore, scmhaManager, nodeManager) {
 +      @Override
 +      public Pipeline createPipeline(ReplicationConfig repConf,
 +          List<DatanodeDetails> excludedNodes,
 +          List<DatanodeDetails> favoredNodes) throws IOException {
 +        throw new IOException("Cannot create pipelines");
 +      }
 +    };
 +    provider = new WritableECContainerProvider(
 +        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
 +
 +    ContainerInfo container =
 +        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +    assertNull(container);
 +  }
 +
 +  @Test
 +  public void testExistingPipelineReturnedWhenNewCannotBeCreated()
 +      throws IOException {
-     pipelineManager =
-         new MockPipelineManager(dbStore, scmhaManager, nodeManager) {
++    pipelineManager = new MockPipelineManager(
++        dbStore, scmhaManager, nodeManager) {
 +
 +      private boolean throwError = false;
 +
 +      @Override
 +      public Pipeline createPipeline(ReplicationConfig repConf,
 +          List<DatanodeDetails> excludedNodes,
 +          List<DatanodeDetails> favoredNodes) throws IOException {
 +        if (throwError) {
 +          throw new IOException("Cannot create pipelines");
 +        }
 +        throwError = true;
 +        return super.createPipeline(repConfig);
 +      }
 +    };
 +    provider = new WritableECContainerProvider(
 +        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
 +
 +    ContainerInfo container =
 +        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +    for (int i = 0; i < 5; i++) {
 +      ContainerInfo nextContainer =
 +          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +      assertEquals(container, nextContainer);
 +    }
 +  }
 +
 +  @Test
 +  public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting()
 +      throws IOException {
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container =
 +          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +      assertFalse(allocatedContainers.contains(container));
 +      allocatedContainers.add(container);
 +    }
 +    // Update all the containers to make them nearly full, but with enough space
 +    // for an EC block to be striped across them.
 +    for (ContainerInfo c : allocatedContainers) {
 +      c.setUsedBytes(getMaxContainerSize() - 30 * 1024 * 1024);
 +    }
 +
 +    // Get a new container of size 50 and ensure it is one of the original set.
 +    // We ask for a space of 50, but as it is stripped across the EC group it
 +    // will actually need 50 / dataNum space
 +    ContainerInfo newContainer =
 +        provider.getContainer(50 * 1024 * 1024, repConfig, OWNER,
 +            new ExcludeList());
 +    assertNotNull(newContainer);
 +    assertTrue(allocatedContainers.contains(newContainer));
 +    // Now get a new container where there is not enough space in the existing
 +    // and ensure a new container gets created.
 +    newContainer = provider.getContainer(
 +        128 * 1024 * 1024, repConfig, OWNER, new ExcludeList());
 +    assertNotNull(newContainer);
 +    assertFalse(allocatedContainers.contains(newContainer));
 +    // The original pipelines should all be closed, triggered by the lack of
 +    // space.
 +    for (ContainerInfo c : allocatedContainers) {
 +      Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
 +      assertEquals(CLOSED, pipeline.getPipelineState());
 +    }
 +  }
 +
 +  @Test
 +  public void testPipelineNotFoundWhenAttemptingToUseExisting()
 +      throws IOException {
 +    // Ensure PM throws PNF exception when we ask for the containers in the
 +    // pipeline
-     pipelineManager =
-         new MockPipelineManager(dbStore, scmhaManager, nodeManager) {
++    pipelineManager = new MockPipelineManager(
++        dbStore, scmhaManager, nodeManager) {
 +
 +      @Override
 +      public NavigableSet<ContainerID> getContainersInPipeline(
 +          PipelineID pipelineID) throws IOException {
 +        throw new PipelineNotFoundException("Simulated exception");
 +      }
 +    };
 +    provider = new WritableECContainerProvider(
 +        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
 +
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container =
 +          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +      assertFalse(allocatedContainers.contains(container));
 +      allocatedContainers.add(container);
 +    }
 +    // Now attempt to get a container - any attempt to use an existing with
 +    // throw PNF and then we must allocate a new one
 +    ContainerInfo newContainer =
 +        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +    assertNotNull(newContainer);
 +    assertFalse(allocatedContainers.contains(newContainer));
 +  }
 +
 +  @Test
 +  public void testContainerNotFoundWhenAttemptingToUseExisting()
 +      throws IOException {
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container =
 +          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +      assertFalse(allocatedContainers.contains(container));
 +      allocatedContainers.add(container);
 +    }
 +
 +    // Ensure ContainerManager always throws when a container is requested so
 +    // existing pipelines cannot be used
 +    Mockito.doAnswer(call -> {
 +      throw new ContainerNotFoundException();
 +    }).when(containerManager).getContainer(Matchers.any(ContainerID.class));
 +
 +    ContainerInfo newContainer =
 +        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
 +    assertNotNull(newContainer);
 +    assertFalse(allocatedContainers.contains(newContainer));
 +
 +    // Ensure all the existing pipelines are closed
 +    for (ContainerInfo c : allocatedContainers) {
 +      Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
 +      assertEquals(CLOSED, pipeline.getPipelineState());
 +    }
 +  }
 +
 +  @Test
 +  public void testPipelineOpenButContainerRemovedFromIt() throws IOException {
 +    // This can happen if the container close process is triggered from the DN.
 +    // When tha happens, CM will change the container state to CLOSING and
 +    // remove it from the container list in pipeline Manager.
 +    Set<ContainerInfo> allocatedContainers = new HashSet<>();
 +    for (int i = 0; i < minPipelines; i++) {
 +      ContainerInfo container = provider.getContainer(
 +          1, repConfig, OWNER, new ExcludeList());
 +      assertFalse(allocatedContainers.contains(container));
 +      allocatedContainers.add(container);
 +      // Remove the container from the pipeline to simulate closing it
 +      pipelineManager.removeContainerFromPipeline(
 +          container.getPipelineID(), container.containerID());
 +    }
 +    ContainerInfo newContainer = provider.getContainer(
 +        1, repConfig, OWNER, new ExcludeList());
 +    assertFalse(allocatedContainers.contains(newContainer));
 +    for (ContainerInfo c : allocatedContainers) {
 +      Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
 +      assertEquals(CLOSED, pipeline.getPipelineState());
 +    }
 +  }
 +
 +  @Test
 +  public void testExcludedNodesPassedToCreatePipelineIfProvided()
 +      throws IOException {
 +    PipelineManager pipelineManagerSpy = Mockito.spy(pipelineManager);
 +    provider = new WritableECContainerProvider(
 +        conf, pipelineManagerSpy, containerManager, pipelineChoosingPolicy);
 +    ExcludeList excludeList = new ExcludeList();
 +
 +    // EmptyList should be passed if there are no nodes excluded.
 +    ContainerInfo container = provider.getContainer(
 +        1, repConfig, OWNER, excludeList);
 +    assertNotNull(container);
 +
 +    verify(pipelineManagerSpy).createPipeline(repConfig,
 +        Collections.emptyList(), Collections.emptyList());
 +
 +    // If nodes are excluded then the excluded nodes should be passed through to
 +    // the create pipeline call.
 +    excludeList.addDatanode(MockDatanodeDetails.randomDatanodeDetails());
 +    List<DatanodeDetails> excludedNodes =
 +        new ArrayList<>(excludeList.getDatanodes());
 +
 +    container = provider.getContainer(
 +        1, repConfig, OWNER, excludeList);
 +    assertNotNull(container);
 +    verify(pipelineManagerSpy).createPipeline(repConfig, excludedNodes,
 +        Collections.emptyList());
 +  }
 +
 +  private ContainerInfo createContainer(Pipeline pipeline,
 +      ReplicationConfig repConf, long containerID) {
 +    return new ContainerInfo.Builder()
 +        .setContainerID(containerID)
 +        .setOwner(OWNER)
 +        .setReplicationConfig(repConf)
 +        .setState(HddsProtos.LifeCycleState.OPEN)
 +        .setPipelineID(pipeline.getId())
 +        .setNumberOfKeys(0)
 +        .setUsedBytes(0)
 +        .setSequenceId(0)
 +        .setDeleteTransactionId(0)
 +        .build();
 +  }
 +
 +  private long getMaxContainerSize() {
 +    return (long)conf.getStorageSize(
 +        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
 +        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);
 +  }
 +
 +}
diff --cc hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
index 2ea34b9,a7960d8..10b5758
--- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
@@@ -246,26 -207,6 +246,26 @@@ public class TestInfoSubCommand 
      return new ContainerWithPipeline(container, pipeline);
    }
  
 +  private ContainerWithPipeline getECContainerWithPipeline() {
 +    Pipeline pipeline = new Pipeline.Builder()
 +        .setState(Pipeline.PipelineState.CLOSED)
 +        .setReplicationConfig(new ECReplicationConfig(3, 2))
 +        .setId(PipelineID.randomId())
 +        .setNodes(datanodes)
 +        .build();
 +
 +    ContainerInfo container = new ContainerInfo.Builder()
 +        .setSequenceId(1)
 +        .setPipelineID(pipeline.getId())
 +        .setUsedBytes(1234)
-         .setReplicationConfig(new RatisReplicationConfig(THREE))
++        .setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
 +        .setNumberOfKeys(1)
 +        .setState(CLOSED)
 +        .build();
 +
 +    return new ContainerWithPipeline(container, pipeline);
 +  }
 +
    private List<DatanodeDetails> createDatanodeDetails(int count) {
      List<DatanodeDetails> dns = new ArrayList<>();
      for (int i = 0; i < count; i++) {
diff --cc hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java
index 9d70f3a,0000000..acc3bda
mode 100644,000000..100644
--- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java
@@@ -1,192 -1,0 +1,192 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + *  with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *  Unless required by applicable law or agreed to in writing, software
 + *  distributed under the License is distributed on an "AS IS" BASIS,
 + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  See the License for the specific language governing permissions and
 + *  limitations under the License.
 + */
 +package org.apache.hadoop.hdds.scm.cli.pipeline;
 +
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 +import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.hdds.scm.client.ScmClient;
 +import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 +import org.junit.After;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Test;
 +import org.mockito.Mockito;
 +import picocli.CommandLine;
 +
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.io.PrintStream;
 +import java.nio.charset.StandardCharsets;
 +import java.util.ArrayList;
 +import java.util.List;
 +import java.util.UUID;
 +
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 +import static org.mockito.Mockito.mock;
 +
 +/**
 + * Tests for the ListPipelineSubCommand class.
 + */
 +public class TestListPipelinesSubCommand {
 +
 +  private ListPipelinesSubcommand cmd;
 +  private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
 +  private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
 +  private final PrintStream originalOut = System.out;
 +  private final PrintStream originalErr = System.err;
 +  private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name();
 +  private ScmClient scmClient;
 +
 +  @Before
 +  public void setup() throws IOException {
 +    cmd = new ListPipelinesSubcommand();
 +    System.setOut(new PrintStream(outContent, false, DEFAULT_ENCODING));
 +    System.setErr(new PrintStream(errContent, false, DEFAULT_ENCODING));
 +
 +    scmClient = mock(ScmClient.class);
 +    Mockito.when(scmClient.listPipelines())
 +        .thenAnswer(invocation -> createPipelines());
 +  }
 +
 +  @After
 +  public void tearDown() {
 +    System.setOut(originalOut);
 +    System.setErr(originalErr);
 +  }
 +
 +  @Test
 +  public void testAllPipelinesReturnedWithNoFilter() throws IOException {
 +    CommandLine c = new CommandLine(cmd);
 +    c.parseArgs();
 +    cmd.execute(scmClient);
 +    Assert.assertEquals(6, outContent.toString(DEFAULT_ENCODING)
 +        .split(System.getProperty("line.separator")).length);
 +  }
 +
 +  @Test
 +  public void testOnlyOpenReturned() throws IOException {
 +    CommandLine c = new CommandLine(cmd);
 +    c.parseArgs("-s", "OPEN");
 +    cmd.execute(scmClient);
 +    String output = outContent.toString(DEFAULT_ENCODING);
 +    Assert.assertEquals(3, output.split(
 +        System.getProperty("line.separator")).length);
 +    Assert.assertEquals(-1, output.indexOf("CLOSED"));
 +  }
 +
 +  @Test(expected = IOException.class)
 +  public void testExceptionIfReplicationWithoutType() throws IOException {
 +    CommandLine c = new CommandLine(cmd);
 +    c.parseArgs("-r", "THREE");
 +    cmd.execute(scmClient);
 +  }
 +
 +  @Test
 +  public void testReplicationAndType() throws IOException {
 +    CommandLine c = new CommandLine(cmd);
 +    c.parseArgs("-r", "THREE", "-t", "RATIS");
 +    cmd.execute(scmClient);
 +
 +    String output = outContent.toString(DEFAULT_ENCODING);
 +    Assert.assertEquals(2, output.split(
 +        System.getProperty("line.separator")).length);
 +    Assert.assertEquals(-1, output.indexOf("EC"));
 +  }
 +
 +  @Test
 +  public void testReplicationAndTypeEC() throws IOException {
 +    CommandLine c = new CommandLine(cmd);
 +    c.parseArgs("-r", "rs-6-3-1024k", "-t", "EC");
 +    cmd.execute(scmClient);
 +
 +    String output = outContent.toString(DEFAULT_ENCODING);
 +    Assert.assertEquals(1, output.split(
 +        System.getProperty("line.separator")).length);
 +    Assert.assertEquals(-1,
 +        output.indexOf("ReplicationConfig: RATIS"));
 +  }
 +
 +  @Test
 +  public void testReplicationAndTypeAndState() throws IOException {
 +    CommandLine c = new CommandLine(cmd);
 +    c.parseArgs("-r", "THREE", "-t", "RATIS", "-s", "OPEN");
 +    cmd.execute(scmClient);
 +
 +    String output = outContent.toString(DEFAULT_ENCODING);
 +    Assert.assertEquals(1, output.split(
 +        System.getProperty("line.separator")).length);
 +    Assert.assertEquals(-1, output.indexOf("CLOSED"));
 +    Assert.assertEquals(-1, output.indexOf("EC"));
 +  }
 +
 +  private List<Pipeline> createPipelines() {
 +    List<Pipeline> pipelines = new ArrayList<>();
-     pipelines.add(createPipeline(
-         new StandaloneReplicationConfig(ONE), Pipeline.PipelineState.OPEN));
-     pipelines.add(createPipeline(
-         new RatisReplicationConfig(THREE), Pipeline.PipelineState.OPEN));
-     pipelines.add(createPipeline(
-         new RatisReplicationConfig(THREE), Pipeline.PipelineState.CLOSED));
++    pipelines.add(createPipeline(StandaloneReplicationConfig.getInstance(ONE),
++        Pipeline.PipelineState.OPEN));
++    pipelines.add(createPipeline(RatisReplicationConfig.getInstance(THREE),
++        Pipeline.PipelineState.OPEN));
++    pipelines.add(createPipeline(RatisReplicationConfig.getInstance(THREE),
++        Pipeline.PipelineState.CLOSED));
 +
 +    pipelines.add(createPipeline(
 +        new ECReplicationConfig(3, 2), Pipeline.PipelineState.OPEN));
 +    pipelines.add(createPipeline(
 +        new ECReplicationConfig(3, 2), Pipeline.PipelineState.CLOSED));
 +    pipelines.add(createPipeline(
 +        new ECReplicationConfig(6, 3), Pipeline.PipelineState.CLOSED));
 +
 +    return pipelines;
 +  }
 +
 +  private Pipeline createPipeline(ReplicationConfig repConfig,
 +      Pipeline.PipelineState state) {
 +    return new Pipeline.Builder()
 +        .setId(PipelineID.randomId())
 +        .setCreateTimestamp(System.currentTimeMillis())
 +        .setState(state)
 +        .setReplicationConfig(repConfig)
 +        .setNodes(createDatanodeDetails(1))
 +        .build();
 +  }
 +
 +  private List<DatanodeDetails> createDatanodeDetails(int count) {
 +    List<DatanodeDetails> dns = new ArrayList<>();
 +    for (int i = 0; i < count; i++) {
 +      HddsProtos.DatanodeDetailsProto dnd =
 +          HddsProtos.DatanodeDetailsProto.newBuilder()
 +              .setHostName("host" + i)
 +              .setIpAddress("1.2.3." + i + 1)
 +              .setNetworkLocation("/default")
 +              .setNetworkName("host" + i)
 +              .addPorts(HddsProtos.Port.newBuilder()
 +                  .setName("ratis").setValue(5678).build())
 +              .setUuid(UUID.randomUUID().toString())
 +              .build();
 +      dns.add(DatanodeDetails.getFromProtoBuf(dnd));
 +    }
 +    return dns;
 +  }
 +}
 +
 +
diff --cc hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 405182f,0000000..b0e9755
mode 100644,000000..100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@@ -1,419 -1,0 +1,420 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.ozone.client.io;
 +
 +import com.google.common.base.Preconditions;
 +import org.apache.hadoop.fs.FSExceptionMessages;
 +import org.apache.hadoop.hdds.client.BlockID;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
 +import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.hdds.scm.XceiverClientFactory;
 +import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 +import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 +import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
 +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.EOFException;
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.function.Function;
 +
 +/**
 + * Class to read data from an EC Block Group.
 + */
 +public class ECBlockInputStream extends BlockExtendedInputStream {
 +
 +  private static final Logger LOG =
 +      LoggerFactory.getLogger(ECBlockInputStream.class);
 +
 +  private final ECReplicationConfig repConfig;
 +  private final int ecChunkSize;
 +  private final long stripeSize;
 +  private final BlockInputStreamFactory streamFactory;
 +  private final boolean verifyChecksum;
 +  private final XceiverClientFactory xceiverClientFactory;
 +  private final Function<BlockID, Pipeline> refreshFunction;
 +  private final OmKeyLocationInfo blockInfo;
 +  private final DatanodeDetails[] dataLocations;
 +  private final BlockExtendedInputStream[] blockStreams;
 +  private final int maxLocations;
 +
 +  private long position = 0;
 +  private boolean closed = false;
 +  private boolean seeked = false;
 +
 +  protected OmKeyLocationInfo getBlockInfo() {
 +    return blockInfo;
 +  }
 +
 +  protected ECReplicationConfig getRepConfig() {
 +    return repConfig;
 +  }
 +
 +  protected DatanodeDetails[] getDataLocations() {
 +    return dataLocations;
 +  }
 +
 +  protected long getStripeSize() {
 +    return stripeSize;
 +  }
 +
 +  /**
 +   * Returns the number of available data locations, taking account of the
 +   * expected number of locations. Eg, if the block is less than 1 EC chunk,
 +   * we only expect 1 data location. If it is between 1 and 2 chunks, we expect
 +   * there to be 2 locations, and so on.
 +   * @param expectedLocations The maximum number of allowed data locations,
 +   *                          depending on the block size.
 +   * @return The number of available data locations.
 +   */
 +  protected int availableDataLocations(int expectedLocations) {
 +    int count = 0;
 +    for (int i = 0; i < repConfig.getData() && i < expectedLocations; i++) {
 +      if (dataLocations[i] != null) {
 +        count++;
 +      }
 +    }
 +    return count;
 +  }
 +
 +  protected int availableParityLocations() {
 +    int count = 0;
 +    for (int i = repConfig.getData();
 +         i < repConfig.getData() + repConfig.getParity(); i++) {
 +      if (dataLocations[i] != null) {
 +        count++;
 +      }
 +    }
 +    return count;
 +  }
 +
 +  public ECBlockInputStream(ECReplicationConfig repConfig,
 +      OmKeyLocationInfo blockInfo, boolean verifyChecksum,
 +      XceiverClientFactory xceiverClientFactory, Function<BlockID,
 +      Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
 +    this.repConfig = repConfig;
 +    this.ecChunkSize = repConfig.getEcChunkSize();
 +    this.verifyChecksum = verifyChecksum;
 +    this.blockInfo = blockInfo;
 +    this.streamFactory = streamFactory;
 +    this.xceiverClientFactory = xceiverClientFactory;
 +    this.refreshFunction = refreshFunction;
 +    this.maxLocations = repConfig.getData() + repConfig.getParity();
 +    this.dataLocations = new DatanodeDetails[repConfig.getRequiredNodes()];
 +    this.blockStreams =
 +        new BlockExtendedInputStream[repConfig.getRequiredNodes()];
 +
 +    this.stripeSize = (long)ecChunkSize * repConfig.getData();
 +    setBlockLocations(this.blockInfo.getPipeline());
 +  }
 +
 +  public synchronized boolean hasSufficientLocations() {
 +    // The number of locations needed is a function of the EC Chunk size. If the
 +    // block length is <= the chunk size, we should only have location 1. If it
 +    // is greater than the chunk size but less than chunk_size * 2, then we must
 +    // have two locations. If it is greater than chunk_size * data_num, then we
 +    // must have all data_num locations.
 +    // We only consider data locations here.
 +    int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
 +    return expectedDataBlocks == availableDataLocations(expectedDataBlocks);
 +  }
 +
 +  protected int calculateExpectedDataBlocks(ECReplicationConfig rConfig) {
 +    return ECBlockInputStreamProxy.expectedDataLocations(rConfig, getLength());
 +  }
 +
 +  /**
 +   * Using the current position, returns the index of the blockStream we should
 +   * be reading from. This is the index in the internal array holding the
 +   * stream reference. The block group index will be one greater than this.
 +   * @return
 +   */
 +  protected int currentStreamIndex() {
 +    return (int)((position / ecChunkSize) % repConfig.getData());
 +  }
 +
 +  /**
 +   * Uses the current position and ecChunkSize to determine which of the
 +   * internal block streams the next read should come from. Also opens the
 +   * stream if it has not been opened already.
 +   * @return BlockInput stream to read from.
 +   */
 +  protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
 +    BlockExtendedInputStream stream = blockStreams[locationIndex];
 +    if (stream == null) {
 +      // To read an EC block, we create a STANDALONE pipeline that contains the
 +      // single location for the block index we want to read. The EC blocks are
 +      // indexed from 1 to N, however the data locations are stored in the
 +      // dataLocations array indexed from zero.
 +      Pipeline pipeline = Pipeline.newBuilder()
-           .setReplicationConfig(new StandaloneReplicationConfig(
++          .setReplicationConfig(StandaloneReplicationConfig.getInstance(
 +              HddsProtos.ReplicationFactor.ONE))
 +          .setNodes(Arrays.asList(dataLocations[locationIndex]))
 +          .setId(PipelineID.randomId())
 +          .setState(Pipeline.PipelineState.CLOSED)
 +          .build();
 +
 +      OmKeyLocationInfo blkInfo = new OmKeyLocationInfo.Builder()
 +          .setBlockID(blockInfo.getBlockID())
 +          .setLength(internalBlockLength(locationIndex + 1))
 +          .setPipeline(blockInfo.getPipeline())
 +          .setToken(blockInfo.getToken())
 +          .setPartNumber(blockInfo.getPartNumber())
 +          .build();
 +      stream = streamFactory.create(
-           new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE),
++          StandaloneReplicationConfig.getInstance(
++              HddsProtos.ReplicationFactor.ONE),
 +          blkInfo, pipeline,
 +          blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
 +          refreshFunction);
 +      blockStreams[locationIndex] = stream;
 +    }
 +    return stream;
 +  }
 +
 +  /**
 +   * Returns the length of the Nth block in the block group, taking account of a
 +   * potentially partial last stripe. Note that the internal block index is
 +   * numbered starting from 1.
 +   * @param index - Index number of the internal block, starting from 1
 +   * @return
 +   */
 +  protected long internalBlockLength(int index) {
 +    long lastStripe = blockInfo.getLength() % stripeSize;
 +    long blockSize = (blockInfo.getLength() - lastStripe) / repConfig.getData();
 +    long lastCell = lastStripe / ecChunkSize + 1;
 +    long lastCellLength = lastStripe % ecChunkSize;
 +
 +    if (index > repConfig.getData()) {
 +      // Its a parity block and their size is driven by the size of the
 +      // first block of the block group. All parity blocks have the same size
 +      // as block_1.
 +      index = 1;
 +    }
 +
 +    if (index < lastCell) {
 +      return blockSize + ecChunkSize;
 +    } else if (index == lastCell) {
 +      return blockSize + lastCellLength;
 +    } else {
 +      return blockSize;
 +    }
 +  }
 +
 +  private void setBlockLocations(Pipeline pipeline) {
 +    for (DatanodeDetails node : pipeline.getNodes()) {
 +      int index = pipeline.getReplicaIndex(node);
 +      addBlockLocation(index, node);
 +    }
 +  }
 +
 +  private void addBlockLocation(int index, DatanodeDetails location) {
 +    if (index > maxLocations) {
 +      throw new IndexOutOfBoundsException("The index " + index + " is greater "
 +          + "than the EC Replication Config (" + repConfig + ")");
 +    }
 +    dataLocations[index - 1] = location;
 +  }
 +
 +  protected long blockLength() {
 +    return blockInfo.getLength();
 +  }
 +
 +  protected long remaining() {
 +    return blockLength() - position;
 +  }
 +
 +  /**
 +   * Read from the internal BlockInputStreams one EC cell at a time into the
 +   * strategy buffer. This call may read from several internal BlockInputStreams
 +   * if there is sufficient space in the buffer.
 +   * @param strategy
 +   * @return
 +   * @throws IOException
 +   */
 +  @Override
 +  protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
 +      throws IOException {
 +    Preconditions.checkArgument(strategy != null);
 +    checkOpen();
 +
 +    if (remaining() == 0) {
 +      return EOF;
 +    }
 +
 +    int totalRead = 0;
 +    while (strategy.getTargetLength() > 0 && remaining() > 0) {
 +      try {
 +        int currentIndex = currentStreamIndex();
 +        BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
 +        int read = readFromStream(stream, strategy);
 +        totalRead += read;
 +        position += read;
 +      } catch (IOException ioe) {
 +        throw new BadDataLocationException(
 +            dataLocations[currentStreamIndex()], ioe);
 +      }
 +    }
 +    return totalRead;
 +  }
 +
 +  @Override
 +  public synchronized long getRemaining() {
 +    return blockInfo.getLength() - position;
 +  }
 +
 +  @Override
 +  public synchronized long getLength() {
 +    return blockInfo.getLength();
 +  }
 +
 +  @Override
 +  public BlockID getBlockID() {
 +    return blockInfo.getBlockID();
 +  }
 +
 +  protected void seekStreamIfNecessary(BlockExtendedInputStream stream,
 +      long partialChunkSize) throws IOException {
 +    if (seeked) {
 +      // Seek on the underlying streams is performed lazily, as there is a
 +      // possibility a read after a seek may only read a small amount of data.
 +      // Once this block stream has been seeked, we always check the position,
 +      // but in the usual case, where there are no seeks at all, we don't need
 +      // to do this extra work.
 +      long basePosition = (position / stripeSize) * (long)ecChunkSize;
 +      long streamPosition = basePosition + partialChunkSize;
 +      if (streamPosition != stream.getPos()) {
 +        // This ECBlockInputStream has been seeked, so the underlying
 +        // block stream is no longer at the correct position. Therefore we need
 +        // to seek it too.
 +        stream.seek(streamPosition);
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Read the most allowable amount of data from the current stream. This
 +   * ensures we don't read past the end of an EC cell or the overall block
 +   * group length.
 +   * @param stream Stream to read from
 +   * @param strategy The ReaderStrategy to read data into
 +   * @return
 +   * @throws IOException
 +   */
 +  private int readFromStream(BlockExtendedInputStream stream,
 +      ByteReaderStrategy strategy)
 +      throws IOException {
 +    long partialPosition = position % ecChunkSize;
 +    seekStreamIfNecessary(stream, partialPosition);
 +    long ecLimit = ecChunkSize - partialPosition;
 +    // Free space in the buffer to read into
 +    long bufLimit = strategy.getTargetLength();
 +    // How much we can read, the lower of the EC Cell, buffer and overall block
 +    // remaining.
 +    int expectedRead = (int)Math.min(Math.min(ecLimit, bufLimit), remaining());
 +    int actualRead = strategy.readFromBlock(stream, expectedRead);
 +    if (actualRead == -1) {
 +      // The Block Stream reached EOF, but we did not expect it to, so the block
 +      // might be corrupt.
 +      throw new IOException("Expected to read " + expectedRead + " but got EOF"
 +          + " from blockGroup " + stream.getBlockID() + " index "
 +          + currentStreamIndex() + 1);
 +    }
 +    return actualRead;
 +  }
 +
 +  /**
 +   * Verify that the input stream is open.
 +   * @throws IOException if the connection is closed.
 +   */
 +  private void checkOpen() throws IOException {
 +    if (closed) {
 +      throw new IOException(
 +          ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Block: "
 +              + blockInfo.getBlockID());
 +    }
 +  }
 +
 +  @Override
 +  public synchronized void close() {
 +    closeStreams();
 +    closed = true;
 +  }
 +
 +  protected synchronized void closeStreams() {
 +    for (int i = 0; i < blockStreams.length; i++) {
 +      if (blockStreams[i] != null) {
 +        try {
 +          blockStreams[i].close();
 +          blockStreams[i] = null;
 +        } catch (IOException e) {
 +          LOG.error("Failed to close stream {}", blockStreams[i], e);
 +        }
 +      }
 +    }
 +    // If the streams have been closed outside of a close() call, then it may
 +    // be due to freeing resources. If they are reopened, then we will need to
 +    // seek the stream to its expected position when the next read is attempted.
 +    seeked = true;
 +  }
 +
 +  @Override
 +  public synchronized void unbuffer() {
 +    for (BlockExtendedInputStream stream : blockStreams) {
 +      if (stream != null) {
 +        stream.unbuffer();
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public synchronized void seek(long pos) throws IOException {
 +    checkOpen();
 +    if (pos < 0 || pos >= getLength()) {
 +      if (pos == 0) {
 +        // It is possible for length and pos to be zero in which case
 +        // seek should return instead of throwing exception
 +        return;
 +      }
 +      throw new EOFException(
 +          "EOF encountered at pos: " + pos + " for block: "
 +              + blockInfo.getBlockID());
 +    }
 +    position = pos;
 +    seeked = true;
 +  }
 +
 +  @Override
 +  public synchronized long getPos() {
 +    return position;
 +  }
 +
 +  protected synchronized void setPos(long pos) {
 +    position = pos;
 +  }
 +
 +  @Override
 +  public synchronized boolean seekToNewSource(long l) throws IOException {
 +    return false;
 +  }
- }
++}
diff --cc hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
index a742ab9,650fc91..aa64fd2
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
@@@ -115,52 -111,4 +115,56 @@@ public class TestOmBucketInfo 
      Assert.assertEquals((int) 1, cloneBucketInfo.getAcls().size());
  
    }
 +
 +  @Test
 +  public void getProtobufMessageEC() {
 +    OmBucketInfo omBucketInfo =
 +        OmBucketInfo.newBuilder().setBucketName("bucket").setVolumeName("vol1")
 +            .setCreationTime(Time.now()).setIsVersionEnabled(false)
 +            .setStorageType(StorageType.ARCHIVE).setAcls(Collections
 +            .singletonList(new OzoneAcl(IAccessAuthorizer.ACLIdentityType.USER,
 +                "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
 +                OzoneAcl.AclScope.ACCESS))).build();
 +    OzoneManagerProtocolProtos.BucketInfo protobuf = omBucketInfo.getProtobuf();
 +    // No EC Config
 +    Assert.assertFalse(protobuf.hasDefaultReplicationConfig());
 +
 +    // Reconstruct object from Proto
 +    OmBucketInfo recovered = OmBucketInfo.getFromProtobuf(protobuf);
 +    Assert.assertNull(recovered.getDefaultReplicationConfig());
 +
 +    // EC Config
-     omBucketInfo =
-         OmBucketInfo.newBuilder().setBucketName("bucket").setVolumeName("vol1")
-             .setCreationTime(Time.now()).setIsVersionEnabled(false)
-             .setStorageType(StorageType.ARCHIVE).setAcls(Collections
-             .singletonList(new OzoneAcl(IAccessAuthorizer.ACLIdentityType.USER,
-                 "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
-                 OzoneAcl.AclScope.ACCESS))).setDefaultReplicationConfig(
++    omBucketInfo = OmBucketInfo.newBuilder()
++        .setBucketName("bucket")
++        .setVolumeName("vol1")
++        .setCreationTime(Time.now())
++        .setIsVersionEnabled(false)
++        .setStorageType(StorageType.ARCHIVE)
++        .setAcls(Collections.singletonList(new OzoneAcl(
++            IAccessAuthorizer.ACLIdentityType.USER,
++            "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
++            OzoneAcl.AclScope.ACCESS)))
++        .setDefaultReplicationConfig(
 +            new DefaultReplicationConfig(ReplicationType.EC,
 +                new ECReplicationConfig(3, 2))).build();
 +    protobuf = omBucketInfo.getProtobuf();
 +
 +    Assert.assertTrue(protobuf.hasDefaultReplicationConfig());
 +    Assert.assertEquals(3,
 +        protobuf.getDefaultReplicationConfig().getEcReplicationConfig()
 +            .getData());
 +    Assert.assertEquals(2,
 +        protobuf.getDefaultReplicationConfig().getEcReplicationConfig()
 +            .getParity());
 +
 +    // Reconstruct object from Proto
 +    recovered = OmBucketInfo.getFromProtobuf(protobuf);
 +    Assert.assertEquals(ReplicationType.EC,
 +        recovered.getDefaultReplicationConfig().getType());
 +    ECReplicationConfig config =
 +        recovered.getDefaultReplicationConfig().getEcReplicationConfig();
 +    Assert.assertNotNull(config);
 +    Assert.assertEquals(3, config.getData());
 +    Assert.assertEquals(2, config.getParity());
 +  }
  }
diff --cc hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
index 4440ce0,3b04ed4..a6a0ec9
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
@@@ -25,9 -23,9 +25,10 @@@ import org.apache.hadoop.hdds.client.St
  import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
  import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
  import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+ import org.apache.hadoop.ozone.ClientVersion;
  import org.apache.hadoop.ozone.OzoneAcl;
  import org.apache.hadoop.ozone.om.helpers.OmKeyInfo.Builder;
 +import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
  import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
  import org.apache.hadoop.util.Time;
  import org.junit.Assert;
@@@ -38,10 -36,6 +39,9 @@@ import java.util.Arrays
  import java.util.Collections;
  import java.util.List;
  
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
- import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
  import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
  
  /**
@@@ -51,57 -45,7 +51,57 @@@ public class TestOmKeyInfo 
  
    @Test
    public void protobufConversion() {
-     OmKeyInfo key =
-         createOmKeyInfo(new RatisReplicationConfig(ReplicationFactor.THREE));
 -    OmKeyInfo key = new Builder()
++    OmKeyInfo key = createOmKeyInfo(
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
 +
-     OmKeyInfo keyAfterSerialization =
-         OmKeyInfo.getFromProtobuf(key.getProtobuf(CURRENT_VERSION));
++    OmKeyInfo keyAfterSerialization = OmKeyInfo.getFromProtobuf(
++        key.getProtobuf(ClientVersion.CURRENT_VERSION));
 +
 +    Assert.assertEquals(key, keyAfterSerialization);
 +  }
 +
 +  @Test
 +  public void getProtobufMessageEC() {
-     OmKeyInfo key =
-         createOmKeyInfo(new RatisReplicationConfig(ReplicationFactor.THREE));
++    OmKeyInfo key = createOmKeyInfo(
++        RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
 +    OzoneManagerProtocolProtos.KeyInfo omKeyProto =
-         key.getProtobuf(CURRENT_VERSION);
++        key.getProtobuf(ClientVersion.CURRENT_VERSION);
 +
 +    // No EC Config
 +    Assert.assertFalse(omKeyProto.hasEcReplicationConfig());
 +    Assert.assertEquals(THREE, omKeyProto.getFactor());
 +    Assert.assertEquals(RATIS, omKeyProto.getType());
 +
 +    // Reconstruct object from Proto
 +    OmKeyInfo recovered = OmKeyInfo.getFromProtobuf(omKeyProto);
 +    Assert.assertEquals(RATIS,
 +        recovered.getReplicationConfig().getReplicationType());
 +    Assert.assertTrue(
 +        recovered.getReplicationConfig() instanceof RatisReplicationConfig);
 +
 +    // EC Config
 +    key = createOmKeyInfo(new ECReplicationConfig(3, 2));
-     omKeyProto = key.getProtobuf(CURRENT_VERSION);
++    omKeyProto = key.getProtobuf(ClientVersion.CURRENT_VERSION);
 +
 +    Assert.assertEquals(3, omKeyProto.getEcReplicationConfig().getData());
 +    Assert.assertEquals(2, omKeyProto.getEcReplicationConfig().getParity());
 +    Assert.assertFalse(omKeyProto.hasFactor());
 +    Assert.assertEquals(EC, omKeyProto.getType());
 +
 +    // Reconstruct object from Proto
 +    recovered = OmKeyInfo.getFromProtobuf(omKeyProto);
 +    Assert.assertEquals(EC,
 +        recovered.getReplicationConfig().getReplicationType());
 +    Assert.assertTrue(
 +        recovered.getReplicationConfig() instanceof ECReplicationConfig);
 +    ECReplicationConfig config =
 +        (ECReplicationConfig) recovered.getReplicationConfig();
 +    Assert.assertEquals(3, config.getData());
 +    Assert.assertEquals(2, config.getParity());
 +  }
 +
 +  private OmKeyInfo createOmKeyInfo(ReplicationConfig replicationConfig) {
 +    return new Builder()
          .setKeyName("key1")
          .setBucketName("bucket")
          .setVolumeName("vol1")
@@@ -211,4 -161,4 +211,4 @@@
          .setPipeline(pipeline)
          .build();
    }
--}
++}
diff --cc hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java
index 59aeece,ee850bc..c6f8818
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java
@@@ -42,90 -34,22 +42,91 @@@ import static org.junit.Assert.assertNo
  public class TestOmMultipartKeyInfo {
  
    @Test
 -  public void testCopyObject() {
 -    OmMultipartKeyInfo omMultipartKeyInfo = new OmMultipartKeyInfo.Builder()
 -        .setUploadID(UUID.randomUUID().toString())
 -        .setCreationTime(Time.now())
 -        .setReplicationConfig(RatisReplicationConfig
 -            .getInstance(HddsProtos.ReplicationFactor.THREE))
 +  public void copyObject() {
 +    for (ReplicationConfig param : replicationConfigs().collect(toList())) {
 +      testCopyObject(param);
 +    }
 +  }
 +
 +  //@ParameterizedTest
 +  //@MethodSource("replicationConfigs")
 +  private void testCopyObject(ReplicationConfig replicationConfig) {
 +    // GIVEN
 +    OmMultipartKeyInfo subject = createSubject()
 +        .setReplicationConfig(replicationConfig)
 +        .build();
 +
 +    // WHEN
 +    OmMultipartKeyInfo copy = subject.copyObject();
 +
 +    // THEN
 +    assertNotSame(subject, copy);
 +    assertEquals(subject, copy);
 +    assertEquals(replicationConfig, copy.getReplicationConfig());
 +  }
 +
 +  @Test
 +  public void protoConversion() {
 +    for (ReplicationConfig param : replicationConfigs().collect(toList())) {
 +      protoConversion(param);
 +    }
 +  }
 +
 +  //@ParameterizedTest
 +  //@MethodSource("replicationConfigs")
 +  private void protoConversion(ReplicationConfig replicationConfig) {
 +    // GIVEN
 +    OmMultipartKeyInfo subject = createSubject()
 +        .setReplicationConfig(replicationConfig)
          .build();
  
 -    OmMultipartKeyInfo cloneMultipartKeyInfo = omMultipartKeyInfo.copyObject();
 +    // WHEN
 +    OzoneManagerProtocolProtos.MultipartKeyInfo proto = subject.getProto();
 +    OmMultipartKeyInfo fromProto = OmMultipartKeyInfo.getFromProto(proto);
 +
 +    // THEN
 +    assertEquals(subject, fromProto);
 +    assertEquals(replicationConfig, fromProto.getReplicationConfig());
 +  }
 +
 +  private static Stream<ReplicationConfig> replicationConfigs() {
 +    return Stream.of(
-         new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE),
-         new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE),
++        StandaloneReplicationConfig.getInstance(
++            HddsProtos.ReplicationFactor.ONE),
++        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
 +        new ECReplicationConfig(3, 2)
 +    );
 +  }
 +
 +  @Test
 +  public void distinctListOfParts() {
 +    // GIVEN
 +    OmMultipartKeyInfo subject = createSubject().build();
 +    OmMultipartKeyInfo copy = subject.copyObject();
 +
 +    // WHEN
 +    subject.addPartKeyInfo(1, createPart(createKeyInfo()).build());
 +
 +    // THEN
 +    assertEquals(0, copy.getPartKeyInfoMap().size());
 +    assertEquals(1, subject.getPartKeyInfoMap().size());
 +  }
  
 -    Assert.assertEquals(cloneMultipartKeyInfo, omMultipartKeyInfo);
 +  private static OmMultipartKeyInfo.Builder createSubject() {
 +    return new OmMultipartKeyInfo.Builder()
 +        .setUploadID(UUID.randomUUID().toString())
 +        .setCreationTime(Time.now());
 +  }
  
 -    // Just setting dummy values for this test.
 -    omMultipartKeyInfo.addPartKeyInfo(1,
 -        PartKeyInfo.newBuilder().setPartNumber(1).setPartName("/path")
 -            .setPartKeyInfo(KeyInfo.newBuilder()
 +  private static PartKeyInfo.Builder createPart(KeyInfo.Builder partKeyInfo) {
 +    return PartKeyInfo.newBuilder()
 +        .setPartNumber(1)
 +        .setPartName("/path")
 +        .setPartKeyInfo(partKeyInfo);
 +  }
 +
 +  private static KeyInfo.Builder createKeyInfo() {
 +    return KeyInfo.newBuilder()
          .setVolumeName(UUID.randomUUID().toString())
          .setBucketName(UUID.randomUUID().toString())
          .setKeyName(UUID.randomUUID().toString())
diff --cc hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestQuotaUtil.java
index d06932a,0000000..558676d
mode 100644,000000..100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestQuotaUtil.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestQuotaUtil.java
@@@ -1,98 -1,0 +1,99 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.ozone.om.helpers;
 +
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
++import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +import static org.apache.hadoop.hdds.client.ECReplicationConfig.EcCodec.RS;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
 +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 +
 +/**
 + * Tests for the QuotaUtil class.
 + */
 +public class TestQuotaUtil {
 +
 +  private static final int ONE_MB = 1024 * 1024;
 +
 +  @Test
 +  public void testRatisThreeReplication() {
-     RatisReplicationConfig repConfig = new RatisReplicationConfig(THREE);
++    ReplicationConfig repConfig = RatisReplicationConfig.getInstance(THREE);
 +    long replicatedSize =
 +        QuotaUtil.getReplicatedSize(123 * ONE_MB, repConfig);
 +    Assert.assertEquals(123 * ONE_MB * 3, replicatedSize);
 +  }
 +
 +  @Test
 +  public void testRatisOneReplication() {
-     RatisReplicationConfig repConfig = new RatisReplicationConfig(ONE);
++    ReplicationConfig repConfig = RatisReplicationConfig.getInstance(ONE);
 +    long replicatedSize =
 +        QuotaUtil.getReplicatedSize(123 * ONE_MB, repConfig);
 +    Assert.assertEquals(123 * ONE_MB, replicatedSize);
 +  }
 +
 +  @Test
 +  public void testECFullStripeReplication() {
 +    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
 +    long dataSize = ONE_MB * 3 * 123; // 123 full stripe
 +    long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
 +    Assert.assertEquals(dataSize + 123 * ONE_MB * 2, replicatedSize);
 +  }
 +
 +  @Test
 +  public void testECPartialStripeIntoFirstChunk() {
 +    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
 +    long dataSize = ONE_MB * 3 * 123 + 10; // 123 full stripes, plus 10 bytes
 +    long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
 +    // Expected is 123 parity stripes, plus another 10 bytes in each parity
 +    Assert.assertEquals(dataSize + 123 * ONE_MB * 2 + 10 * 2, replicatedSize);
 +  }
 +
 +  @Test
 +  public void testECPartialStripeBeyondFirstChunk() {
 +    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
 +    // 123 full stripes, plus 1MB+10 bytes
 +    long dataSize = ONE_MB * 3 * 123 + ONE_MB + 10;
 +    long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
 +    // Expected is 123 parity stripes, plus another 1MB in each parity
 +    Assert.assertEquals(
 +        dataSize + 123 * ONE_MB * 2 + ONE_MB * 2, replicatedSize);
 +  }
 +
 +  @Test
 +  public void testECPartialSingleStripeFirstChunk() {
 +    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
 +    long dataSize = 10;
 +    long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
 +    // Expected is 123 parity stripes, plus another 1MB in each parity
 +    Assert.assertEquals(dataSize + 10 * 2, replicatedSize);
 +  }
 +
 +  @Test
 +  public void testECPartialSingleBeyondFirstChunk() {
 +    ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
 +    long dataSize = 2 * ONE_MB + 10;
 +    long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
 +    // Expected is 123 parity stripes, plus another 1MB in each parity
 +    Assert.assertEquals(dataSize + ONE_MB * 2, replicatedSize);
 +  }
 +
 +}
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index fea70ac,bf24ae9..4c4bdc6
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@@ -1196,66 -1188,6 +1196,66 @@@ public class TestOzoneFileSystem 
    }
  
    @Test
 +  public void testCreateKeyShouldUseRefreshedBucketReplicationConfig()
 +      throws IOException {
 +    OzoneBucket bucket =
 +        TestDataUtil.createVolumeAndBucket(cluster, bucketLayout);
 +    final TestClock testClock = new TestClock(Instant.now(), ZoneOffset.UTC);
 +
 +    String rootPath = String
 +        .format("%s://%s.%s/", OzoneConsts.OZONE_URI_SCHEME, bucket.getName(),
 +            bucket.getVolumeName());
 +
 +    // Set the fs.defaultFS and start the filesystem
 +    Configuration conf = new OzoneConfiguration(cluster.getConf());
 +    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
 +    // Set the number of keys to be processed during batch operate.
 +    OzoneFileSystem o3FS = (OzoneFileSystem) FileSystem.get(conf);
 +
 +    //Let's reset the clock to control the time.
 +    ((BasicOzoneClientAdapterImpl) (o3FS.getAdapter())).setClock(testClock);
 +
 +    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key"),
 +        ReplicationType.RATIS);
 +
 +    bucket.setReplicationConfig(new ECReplicationConfig("rs-3-2-1024k"));
 +
 +    //After changing the bucket policy, it should create ec key, but o3fs will
 +    // refresh after some time. So, it will be sill old type.
 +    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key1"),
 +        ReplicationType.RATIS);
 +
 +    testClock.fastForward(300 * 1000 + 1);
 +
 +    //After client bucket refresh time, it should create new type what is
 +    // available on bucket at that moment.
 +    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key2"),
 +        ReplicationType.EC);
 +
 +    // Rechecking the same steps with changing to Ratis again to check the
 +    // behavior is consistent.
 +    bucket.setReplicationConfig(
-         new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE));
++        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
 +
 +    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key3"),
 +        ReplicationType.EC);
 +
 +    testClock.fastForward(300 * 1000 + 1);
 +
 +    createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key4"),
 +        ReplicationType.RATIS);
 +  }
 +
 +  private void createKeyAndAssertKeyType(OzoneBucket bucket,
 +      OzoneFileSystem o3FS, Path keyPath, ReplicationType expectedType)
 +      throws IOException {
 +    o3FS.createFile(keyPath).build().close();
 +    Assert.assertEquals(expectedType.name(),
 +        bucket.getKey(o3FS.pathToKey(keyPath)).getReplicationConfig()
 +            .getReplicationType().name());
 +  }
 +
 +  @Test
    public void testGetTrashRoots() throws IOException {
      String username = UserGroupInformation.getCurrentUser().getShortUserName();
      Path trashRoot = new Path(OZONE_URI_DELIMITER, TRASH_PREFIX);
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 13802a1,ef561e6..b239621
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@@ -68,15 -67,10 +68,15 @@@ public class TestOzoneConfigurationFiel
          ".duration"); // Deprecated config
      configurationPropsToSkipCompare
          .add(ScmConfig.ConfigStrings.HDDS_SCM_INIT_DEFAULT_LAYOUT_VERSION);
 +    // Currently replication and type configs moved to server side.
 +    configurationPropsToSkipCompare
 +        .add(OzoneConfigKeys.OZONE_REPLICATION);
 +    configurationPropsToSkipCompare
 +        .add(OzoneConfigKeys.OZONE_REPLICATION_TYPE);
      configurationPropsToSkipCompare
-         .add(OzoneConfigKeys.OZONE_OM_CLIENT_PROTOCOL_VERSION_KEY);
+         .add(OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY);
      configurationPropsToSkipCompare
-         .add(OzoneConfigKeys.OZONE_OM_CLIENT_PROTOCOL_VERSION);
+         .add(OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT);
      // This property is tested in TestHttpServer2 instead
      xmlPropsToSkipCompare.add(HttpServer2.HTTP_IDLE_TIMEOUT_MS_KEY);
      addPropertiesNotInXml();
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 610a34c,0000000..4f7881f
mode 100644,000000..100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@@ -1,371 -1,0 +1,371 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership.  The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +package org.apache.hadoop.ozone.client.rpc;
 +
 +import org.apache.hadoop.conf.StorageUnit;
 +import org.apache.hadoop.hdds.HddsConfigKeys;
 +import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationType;
 +import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 +import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.hdds.scm.OzoneClientConfig;
 +import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
 +import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 +import org.apache.hadoop.ozone.MiniOzoneCluster;
 +import org.apache.hadoop.ozone.OzoneConfigKeys;
 +import org.apache.hadoop.ozone.client.BucketArgs;
 +import org.apache.hadoop.ozone.client.ObjectStore;
 +import org.apache.hadoop.ozone.client.OzoneBucket;
 +import org.apache.hadoop.ozone.client.OzoneClient;
 +import org.apache.hadoop.ozone.client.OzoneClientFactory;
 +import org.apache.hadoop.ozone.client.OzoneKey;
 +import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 +import org.apache.hadoop.ozone.client.OzoneVolume;
 +import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
 +import org.apache.hadoop.ozone.client.io.KeyOutputStream;
 +import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 +import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 +import org.apache.hadoop.ozone.container.TestHelper;
 +import org.apache.ozone.test.GenericTestUtils;
 +import org.junit.AfterClass;
 +import org.junit.Assert;
 +import org.junit.BeforeClass;
 +import org.junit.Test;
 +
 +import java.io.IOException;
 +import java.util.Arrays;
 +import java.util.HashMap;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.TimeoutException;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
 +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 +
 +/**
 + * Tests key output stream.
 + */
 +public class TestECKeyOutputStream {
 +  private static MiniOzoneCluster cluster;
 +  private static OzoneConfiguration conf = new OzoneConfiguration();
 +  private static OzoneClient client;
 +  private static ObjectStore objectStore;
 +  private static int chunkSize;
 +  private static int flushSize;
 +  private static int maxFlushSize;
 +  private static int blockSize;
 +  private static String volumeName;
 +  private static String bucketName;
 +  private static String keyString;
 +  private static int dataBlocks = 3;
 +  private static int inputSize = dataBlocks * chunkSize;
 +  private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
 +
 +  /**
 +   * Create a MiniDFSCluster for testing.
 +   */
 +  @BeforeClass
 +  public static void init() throws Exception {
 +    chunkSize = 1024;
 +    flushSize = 2 * chunkSize;
 +    maxFlushSize = 2 * flushSize;
 +    blockSize = 2 * maxFlushSize;
 +
 +    OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
 +    clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
 +    clientConfig.setStreamBufferFlushDelay(false);
 +    conf.setFromObject(clientConfig);
 +
 +    conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
 +    // If SCM detects dead node too quickly, then container would be moved to
 +    // closed state and all in progress writes will get exception. To avoid
 +    // that, we are just keeping higher timeout and none of the tests depending
 +    // on deadnode detection timeout currently.
 +    conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
 +    conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS);
 +    conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
 +        TimeUnit.SECONDS);
 +    conf.setTimeDuration(
 +        "hdds.ratis.raft.server.notification.no-leader.timeout", 300,
 +        TimeUnit.SECONDS);
 +    conf.setQuietMode(false);
 +    conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
 +        StorageUnit.MB);
 +    conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
 +        TimeUnit.MILLISECONDS);
 +    conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
 +        TimeUnit.SECONDS);
 +    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10)
 +        .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
 +        .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
 +        .setStreamBufferMaxSize(maxFlushSize)
 +        .setStreamBufferSizeUnit(StorageUnit.BYTES).build();
 +    cluster.waitForClusterToBeReady();
 +    client = OzoneClientFactory.getRpcClient(conf);
 +    objectStore = client.getObjectStore();
 +    keyString = UUID.randomUUID().toString();
 +    volumeName = "testeckeyoutputstream";
 +    bucketName = volumeName;
 +    objectStore.createVolume(volumeName);
 +    objectStore.getVolume(volumeName).createBucket(bucketName);
 +    initInputChunks();
 +  }
 +
 +  /**
 +   * Shutdown MiniDFSCluster.
 +   */
 +  @AfterClass
 +  public static void shutdown() {
 +    if (cluster != null) {
 +      cluster.shutdown();
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateKeyWithECReplicationConfig() throws Exception {
 +    try (OzoneOutputStream key = TestHelper
 +        .createKey(keyString, new ECReplicationConfig(3, 2,
 +              ECReplicationConfig.EcCodec.RS, chunkSize), inputSize,
 +            objectStore, volumeName, bucketName)) {
 +      Assert.assertTrue(key.getOutputStream() instanceof ECKeyOutputStream);
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateKeyWithOutBucketDefaults() throws Exception {
 +    OzoneVolume volume = objectStore.getVolume(volumeName);
 +    OzoneBucket bucket = volume.getBucket(bucketName);
 +    try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) {
 +      Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
 +      for (int i = 0; i < inputChunks.length; i++) {
 +        out.write(inputChunks[i]);
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateKeyWithBucketDefaults() throws Exception {
 +    String myBucket = UUID.randomUUID().toString();
 +    OzoneVolume volume = objectStore.getVolume(volumeName);
 +    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
 +    bucketArgs.setDefaultReplicationConfig(
 +        new DefaultReplicationConfig(ReplicationType.EC,
 +            new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
 +                chunkSize)));
 +
 +    volume.createBucket(myBucket, bucketArgs.build());
 +    OzoneBucket bucket = volume.getBucket(myBucket);
 +
 +    try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) {
 +      Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
 +      for (int i = 0; i < inputChunks.length; i++) {
 +        out.write(inputChunks[i]);
 +      }
 +    }
 +    byte[] buf = new byte[chunkSize];
 +    try (OzoneInputStream in = bucket.readKey(keyString)) {
 +      for (int i = 0; i < inputChunks.length; i++) {
 +        int read = in.read(buf, 0, chunkSize);
 +        Assert.assertEquals(chunkSize, read);
 +        Assert.assertTrue(Arrays.equals(buf, inputChunks[i]));
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
 +    OzoneBucket bucket = getOzoneBucket();
-     try (OzoneOutputStream out = bucket
-         .createKey("testCreateRatisKeyAndWithECBucketDefaults", 2000,
-             new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE),
-             new HashMap<>())) {
++    try (OzoneOutputStream out = bucket.createKey(
++        "testCreateRatisKeyAndWithECBucketDefaults", 2000,
++        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
++        new HashMap<>())) {
 +      Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
 +      for (int i = 0; i < inputChunks.length; i++) {
 +        out.write(inputChunks[i]);
 +      }
 +    }
 +  }
 +
 +  @Test
 +  public void test13ChunksInSingleWriteOp() throws IOException {
 +    testMultipleChunksInSingleWriteOp(13);
 +  }
 +
 +  @Test
 +  public void test15ChunksInSingleWriteOp() throws IOException {
 +    testMultipleChunksInSingleWriteOp(15);
 +  }
 +
 +  @Test
 +  public void test20ChunksInSingleWriteOp() throws IOException {
 +    testMultipleChunksInSingleWriteOp(20);
 +  }
 +
 +  @Test
 +  public void test21ChunksInSingleWriteOp() throws IOException {
 +    testMultipleChunksInSingleWriteOp(21);
 +  }
 +
 +  public void testMultipleChunksInSingleWriteOp(int numChunks)
 +      throws IOException {
 +    byte[] inputData = getInputBytes(numChunks);
 +    final OzoneBucket bucket = getOzoneBucket();
 +    String keyName = "testMultipleChunksInSingleWriteOp" + numChunks;
 +    try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
 +        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
 +            chunkSize), new HashMap<>())) {
 +      out.write(inputData);
 +    }
 +
 +    validateContent(inputData, bucket, bucket.getKey(keyName));
 +  }
 +
 +  @Test
 +  public void testECContainerKeysCount()
 +      throws IOException, InterruptedException, TimeoutException {
 +    byte[] inputData = getInputBytes(1);
 +    final OzoneBucket bucket = getOzoneBucket();
 +    ContainerOperationClient containerOperationClient =
 +        new ContainerOperationClient(conf);
 +
 +    ECReplicationConfig repConfig = new ECReplicationConfig(
 +        3, 2, ECReplicationConfig.EcCodec.RS, chunkSize);
 +    // Close all EC pipelines so we must get a fresh pipeline and hence
 +    // container for this test.
 +    PipelineManager pm =
 +        cluster.getStorageContainerManager().getPipelineManager();
 +    for (Pipeline p : pm.getPipelines(repConfig)) {
 +      pm.closePipeline(p, true);
 +    }
 +
 +    String keyName = UUID.randomUUID().toString();
 +    try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
 +        repConfig, new HashMap<>())) {
 +      out.write(inputData);
 +    }
 +    OzoneKeyDetails key = bucket.getKey(keyName);
 +    long currentKeyContainerID =
 +        key.getOzoneKeyLocations().get(0).getContainerID();
 +
 +    GenericTestUtils.waitFor(() -> {
 +      try {
 +        return containerOperationClient.getContainer(currentKeyContainerID)
 +            .getNumberOfKeys() == 1;
 +      } catch (IOException exception) {
 +        Assert.fail("Unexpected exception " + exception);
 +        return false;
 +      }
 +    }, 100, 10000);
 +    validateContent(inputData, bucket, key);
 +  }
 +
 +  private void validateContent(byte[] inputData, OzoneBucket bucket,
 +      OzoneKey key) throws IOException {
 +    try (OzoneInputStream is = bucket.readKey(key.getName())) {
 +      byte[] fileContent = new byte[inputData.length];
 +      Assert.assertEquals(inputData.length, is.read(fileContent));
 +      Assert.assertEquals(new String(inputData, UTF_8),
 +          new String(fileContent, UTF_8));
 +    }
 +  }
 +
 +  private OzoneBucket getOzoneBucket() throws IOException {
 +    String myBucket = UUID.randomUUID().toString();
 +    OzoneVolume volume = objectStore.getVolume(volumeName);
 +    final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
 +    bucketArgs.setDefaultReplicationConfig(
 +        new DefaultReplicationConfig(ReplicationType.EC,
 +            new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
 +                chunkSize)));
 +
 +    volume.createBucket(myBucket, bucketArgs.build());
 +    return volume.getBucket(myBucket);
 +  }
 +
 +  private static void initInputChunks() {
 +    for (int i = 0; i < dataBlocks; i++) {
 +      inputChunks[i] = getBytesWith(i + 1, chunkSize);
 +    }
 +  }
 +
 +  private static byte[] getBytesWith(int singleDigitNumber, int total) {
 +    StringBuilder builder = new StringBuilder(singleDigitNumber);
 +    for (int i = 1; i <= total; i++) {
 +      builder.append(singleDigitNumber);
 +    }
 +    return builder.toString().getBytes(UTF_8);
 +  }
 +
 +  @Test
 +  public void testWriteShouldSucceedWhenDNKilled() throws Exception {
 +    int numChunks = 3;
 +    byte[] inputData = getInputBytes(numChunks);
 +    final OzoneBucket bucket = getOzoneBucket();
 +    String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks;
 +    DatanodeDetails nodeToKill = null;
 +    try {
 +      try (OzoneOutputStream out = bucket.createKey(keyName, 1024,
 +          new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
 +              chunkSize), new HashMap<>())) {
 +        out.write(inputData);
 +        // Kill a node from first pipeline
 +        nodeToKill =
 +            ((ECKeyOutputStream) out.getOutputStream()).getStreamEntries()
 +                .get(0).getPipeline().getFirstNode();
 +        cluster.shutdownHddsDatanode(nodeToKill);
 +
 +        out.write(inputData);
 +        // Check the second blockGroup pipeline to make sure that the failed not
 +        // is not selected.
 +        Assert.assertFalse(
 +            ((ECKeyOutputStream) out.getOutputStream()).getStreamEntries()
 +                .get(1).getPipeline().getNodes().contains(nodeToKill));
 +      }
 +
 +      try (OzoneInputStream is = bucket.readKey(keyName)) {
 +        // We wrote "inputData" twice, so do two reads and ensure the correct
 +        // data comes back.
 +        for (int i = 0; i < 2; i++) {
 +          byte[] fileContent = new byte[inputData.length];
 +          Assert.assertEquals(inputData.length, is.read(fileContent));
 +          Assert.assertEquals(new String(inputData, UTF_8),
 +              new String(fileContent, UTF_8));
 +        }
 +      }
 +    } finally {
 +      cluster.restartHddsDatanode(nodeToKill, true);
 +    }
 +  }
 +
 +  private byte[] getInputBytes(int numChunks) {
 +    byte[] inputData = new byte[numChunks * chunkSize];
 +    for (int i = 0; i < numChunks; i++) {
 +      int start = (i * chunkSize);
 +      Arrays.fill(inputData, start, start + chunkSize - 1,
 +          String.valueOf(i % 9).getBytes(UTF_8)[0]);
 +    }
 +    return inputData;
 +  }
 +
 +}
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 6e271fc,6272afa..37dec7c
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@@ -3899,47 -3798,4 +3922,49 @@@ public abstract class TestOzoneRpcClien
      createRequiredForVersioningTest(volumeName, bucketName, keyName, true);
      checkExceptedResultForVersioningTest(volumeName, bucketName, keyName, 2);
    }
 +
 +  @Test
 +  public void testSetECReplicationConfigOnBucket()
 +      throws IOException {
 +    String volumeName = UUID.randomUUID().toString();
 +    store.createVolume(volumeName);
 +    OzoneVolume volume = store.getVolume(volumeName);
 +    OzoneBucket bucket = getBucket(volume);
 +    ReplicationConfig currentReplicationConfig = bucket.getReplicationConfig();
 +    Assert.assertEquals(
-         new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE),
++        StandaloneReplicationConfig.getInstance(
++            HddsProtos.ReplicationFactor.ONE),
 +        currentReplicationConfig);
 +    ECReplicationConfig ecReplicationConfig =
 +        new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, 1024);
 +    bucket.setReplicationConfig(ecReplicationConfig);
 +
 +    // Get the bucket and check the updated config.
 +    bucket = volume.getBucket(bucket.getName());
 +
 +    Assert.assertEquals(ecReplicationConfig, bucket.getReplicationConfig());
 +
 +    RatisReplicationConfig ratisReplicationConfig =
-         new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
 +    bucket.setReplicationConfig(ratisReplicationConfig);
 +
 +    // Get the bucket and check the updated config.
 +    bucket = volume.getBucket(bucket.getName());
 +
 +    Assert.assertEquals(ratisReplicationConfig, bucket.getReplicationConfig());
 +
 +    //Reset replication config back.
 +    bucket.setReplicationConfig(currentReplicationConfig);
 +  }
 +
 +  private OzoneBucket getBucket(OzoneVolume volume) throws IOException {
 +    String bucketName = UUID.randomUUID().toString();
 +    BucketArgs.Builder builder = BucketArgs.newBuilder();
 +    builder.setVersioning(true).setDefaultReplicationConfig(
 +        new DefaultReplicationConfig(
-             new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE)));
++            StandaloneReplicationConfig.getInstance(
++                HddsProtos.ReplicationFactor.ONE)));
 +    volume.createBucket(bucketName, builder.build());
 +    return volume.getBucket(bucketName);
 +  }
  }
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
index dcc03bc,0000000..8532835
mode 100644,000000..100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
@@@ -1,111 -1,0 +1,111 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.ozone.client.rpc.read;
 +
 +import org.apache.hadoop.hdds.client.BlockID;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 +import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
 +import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
 +import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
 +import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
 +import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
 +import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 +import org.junit.Test;
 +import org.junit.Assert;
 +
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.Map;
 +
 +/**
 + * Tests for BlockInputStreamFactoryImpl.
 + */
 +public class TestBlockInputStreamFactoryImpl {
 +
 +  @Test
 +  public void testNonECGivesBlockInputStream() {
 +    BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
 +    ReplicationConfig repConfig =
-         new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
 +
 +    OmKeyLocationInfo blockInfo = createKeyLocationInfo(repConfig, 3,
 +        1024 * 1024 * 10);
 +
 +    BlockExtendedInputStream stream =
 +        factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
 +            blockInfo.getToken(), true, null, null);
 +    Assert.assertTrue(stream instanceof BlockInputStream);
 +    Assert.assertEquals(stream.getBlockID(), blockInfo.getBlockID());
 +    Assert.assertEquals(stream.getLength(), blockInfo.getLength());
 +  }
 +
 +  @Test
 +  public void testECGivesECBlockInputStream() {
 +    BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
 +    ReplicationConfig repConfig =
 +        new ECReplicationConfig(3, 2);
 +
 +    OmKeyLocationInfo blockInfo =
 +        createKeyLocationInfo(repConfig, 5, 1024 * 1024 * 10);
 +
 +    BlockExtendedInputStream stream =
 +        factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
 +            blockInfo.getToken(), true, null, null);
 +    Assert.assertTrue(stream instanceof ECBlockInputStreamProxy);
 +    Assert.assertEquals(stream.getBlockID(), blockInfo.getBlockID());
 +    Assert.assertEquals(stream.getLength(), blockInfo.getLength());
 +  }
 +
 +  private OmKeyLocationInfo createKeyLocationInfo(ReplicationConfig repConf,
 +      long blockLength, Map<DatanodeDetails, Integer> dnMap) {
 +
 +    Pipeline pipeline = Pipeline.newBuilder()
 +        .setState(Pipeline.PipelineState.CLOSED)
 +        .setId(PipelineID.randomId())
 +        .setNodes(new ArrayList<>(dnMap.keySet()))
 +        .setReplicaIndexes(dnMap)
 +        .setReplicationConfig(repConf)
 +        .build();
 +
 +    OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
 +        .setBlockID(new BlockID(1, 1))
 +        .setLength(blockLength)
 +        .setOffset(0)
 +        .setPipeline(pipeline)
 +        .setPartNumber(0)
 +        .build();
 +    return keyInfo;
 +  }
 +
 +  private OmKeyLocationInfo createKeyLocationInfo(ReplicationConfig repConf,
 +      int nodeCount, long blockLength) {
 +    Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
 +    for (int i = 0; i < nodeCount; i++) {
 +      datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
 +    }
 +    return createKeyLocationInfo(repConf, blockLength, datanodes);
 +  }
 +
 +}
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
index 7ab6f3f,7f0ab38..07c2433
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
@@@ -171,14 -169,8 +171,14 @@@ public abstract class TestInputStreamBa
    }
  
    byte[] writeKey(String keyName, int dataLength) throws Exception {
-     ReplicationConfig repConfig = new RatisReplicationConfig(THREE);
++    ReplicationConfig repConfig = RatisReplicationConfig.getInstance(THREE);
 +    return writeKey(keyName, repConfig, dataLength);
 +  }
 +
 +  byte[] writeKey(String keyName, ReplicationConfig repConfig, int dataLength)
 +      throws Exception {
      OzoneOutputStream key = TestHelper.createKey(keyName,
 -        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
 +        repConfig, 0, objectStore, volumeName, bucketName);
  
      byte[] inputData = ContainerTestHelper.getFixedLengthString(
          keyString, dataLength).getBytes(UTF_8);
@@@ -190,15 -182,8 +190,15 @@@
  
    byte[] writeRandomBytes(String keyName, int dataLength)
        throws Exception {
-     ReplicationConfig repConfig = new RatisReplicationConfig(THREE);
++    ReplicationConfig repConfig = RatisReplicationConfig.getInstance(THREE);
 +    return writeRandomBytes(keyName, repConfig, dataLength);
 +  }
 +
 +  byte[] writeRandomBytes(String keyName, ReplicationConfig repConfig,
 +      int dataLength)
 +      throws Exception {
      OzoneOutputStream key = TestHelper.createKey(keyName,
 -        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
 +        repConfig, 0, objectStore, volumeName, bucketName);
  
      byte[] inputData = new byte[dataLength];
      RAND.nextBytes(inputData);
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 63e683f,7a4e1c3..dae6e38
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@@ -20,10 -20,14 +20,16 @@@ package org.apache.hadoop.ozone.contain
  
  import java.io.IOException;
  import java.security.MessageDigest;
- import java.util.*;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
  import java.util.concurrent.TimeoutException;
 +
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
  import org.apache.hadoop.hdds.client.ReplicationType;
  import org.apache.hadoop.hdds.protocol.DatanodeDetails;
  import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
index 689214f,cd6af31..4b51107
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
@@@ -191,10 -179,10 +192,10 @@@ public class TestRandomKeyGenerator 
    }
  
    @Test
-   @org.junit.Ignore("HDDS-5993")
-   public void testCleanObjects() throws Exception {
+   @Flaky("HDDS-5993")
+   public void cleanObjectsTest() throws Exception {
      RandomKeyGenerator randomKeyGenerator =
 -        new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
 +        new RandomKeyGenerator(cluster.getConf());
      randomKeyGenerator.setNumOfVolumes(2);
      randomKeyGenerator.setNumOfBuckets(5);
      randomKeyGenerator.setNumOfKeys(10);
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
index f56b292,9b9d6d8..7ff190a
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@@ -571,8 -569,6 +571,8 @@@ public class TestOmMetrics 
          .setBucketName(bucketName)
          .setKeyName(keyName)
          .setAcls(Lists.emptyList())
-         .setReplicationConfig(
-             new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE))
++        .setReplicationConfig(RatisReplicationConfig.getInstance(
++            HddsProtos.ReplicationFactor.THREE))
          .build();
    }
    private OmVolumeArgs createVolumeArgs() {
diff --cc hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index 54781c2,32dbe04..5e2ef2d
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@@ -175,11 -179,12 +179,11 @@@ public class OMKeyCommitRequestWithFSO 
        // AllocatedBlock. The space occupied by the Key shall be based on
        // the actual Key size, and the total Block size applied before should
        // be subtracted.
 -      long correctedSpace = omKeyInfo.getDataSize() * factor -
 +      long correctedSpace = omKeyInfo.getReplicatedSize() -
-               locationInfoList.size() * scmBlockSize * factor;
+           allocatedLocationInfoList.size() * scmBlockSize * factor;
        // Subtract the size of blocks to be overwritten.
        if (keyToDelete != null) {
 -        correctedSpace -= keyToDelete.getDataSize() *
 -            keyToDelete.getReplicationConfig().getRequiredNodes();
 +        correctedSpace -= keyToDelete.getReplicatedSize();
        }
        omBucketInfo.incrUsedBytes(correctedSpace);
  
diff --cc hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index cb69f27,1b327d2..48bcf46
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@@ -32,14 -32,15 +32,17 @@@ import java.util.Map
  import com.google.common.base.Optional;
  import com.google.common.base.Preconditions;
  import org.apache.commons.lang3.tuple.Pair;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
  import org.apache.hadoop.hdds.client.ReplicationConfig;
  import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+ import org.apache.hadoop.ozone.OmUtils;
  import org.apache.hadoop.ozone.OzoneAcl;
  import org.apache.hadoop.ozone.OzoneConsts;
  import org.apache.hadoop.ozone.om.PrefixManager;
  import org.apache.hadoop.ozone.om.ResolvedBucket;
  import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
++import org.apache.hadoop.ozone.om.helpers.BucketLayout;
  import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
  import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
  import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@@ -48,8 -49,8 +51,8 @@@ import org.apache.hadoop.ozone.om.helpe
  import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
  import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
  import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
--import org.apache.hadoop.ozone.om.helpers.BucketLayout;
 +import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
+ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
  import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
  import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
  import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
diff --cc hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneConfigUtil.java
index 7dc07603,0000000..c888ee7
mode 100644,000000..100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneConfigUtil.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneConfigUtil.java
@@@ -1,105 -1,0 +1,105 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership.  The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +package org.apache.hadoop.ozone.om;
 +
 +import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationFactor;
 +import org.apache.hadoop.hdds.client.ReplicationType;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +/**
 + * Tests the server side replication config preference logic.
 + */
 +public class TestOzoneConfigUtil {
 +  private ReplicationConfig ratis3ReplicationConfig =
-       new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++      RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
 +  private HddsProtos.ReplicationType noneType = HddsProtos.ReplicationType.NONE;
 +  private HddsProtos.ReplicationFactor zeroFactor =
 +      HddsProtos.ReplicationFactor.ZERO;
 +  private HddsProtos.ECReplicationConfig clientECReplicationConfig =
 +      new ECReplicationConfig("rs-3-2-1024K").toProto();
 +  private DefaultReplicationConfig bucketECConfig =
 +      new DefaultReplicationConfig(
 +          new ECReplicationConfig(clientECReplicationConfig));
 +
 +  /**
 +   * Tests EC bucket defaults.
 +   */
 +  @Test
 +  public void testResolveClientSideRepConfigWhenBucketHasEC() {
 +    ReplicationConfig replicationConfig = OzoneConfigUtil
 +        .resolveReplicationConfigPreference(noneType, zeroFactor,
 +            clientECReplicationConfig, bucketECConfig, ratis3ReplicationConfig);
 +    // Client has no preference, so we should bucket defaults as we passed.
 +    Assert.assertEquals(bucketECConfig.getEcReplicationConfig(),
 +        replicationConfig);
 +  }
 +
 +  /**
 +   * Tests server defaults.
 +   */
 +  @Test
 +  public void testResolveClientSideRepConfigWithNoClientAndBucketDefaults() {
 +    ReplicationConfig replicationConfig = OzoneConfigUtil
 +        .resolveReplicationConfigPreference(noneType, zeroFactor,
 +            clientECReplicationConfig, null, ratis3ReplicationConfig);
 +    // Client has no preference, no bucket defaults, so it should return server
 +    // defaults.
 +    Assert.assertEquals(ratis3ReplicationConfig, replicationConfig);
 +  }
 +
 +  /**
 +   * Tests client preference of EC.
 +   */
 +  @Test
 +  public void testResolveClientSideRepConfigWhenClientPassEC() {
 +    ReplicationConfig replicationConfig = OzoneConfigUtil
 +        .resolveReplicationConfigPreference(HddsProtos.ReplicationType.EC,
 +            zeroFactor, clientECReplicationConfig, null,
 +            ratis3ReplicationConfig);
 +    // Client has preference of type EC, no bucket defaults, so it should return
 +    // client preference.
 +    Assert.assertEquals(new ECReplicationConfig("rs-3-2-1024K"),
 +        replicationConfig);
 +  }
 +
 +  /**
 +   * Tests bucket ratis defaults.
 +   */
 +  @Test
 +  public void testResolveClientSideRepConfigWhenBucketHasEC3() {
 +    DefaultReplicationConfig ratisBucketDefaults =
 +        new DefaultReplicationConfig(ReplicationType.RATIS,
 +            ReplicationFactor.THREE);
 +    ReplicationConfig replicationConfig = OzoneConfigUtil
 +        .resolveReplicationConfigPreference(noneType, zeroFactor,
 +            clientECReplicationConfig, ratisBucketDefaults,
 +            ratis3ReplicationConfig);
 +    // Client has no preference of type and bucket has ratis defaults, so it
 +    // should return ratis.
 +    Assert.assertEquals(ratisBucketDefaults.getType().name(),
 +        replicationConfig.getReplicationType().name());
 +    Assert.assertEquals(ratisBucketDefaults.getFactor(),
 +        ReplicationFactor.valueOf(replicationConfig.getRequiredNodes()));
 +  }
 +
 +}
diff --cc hadoop-ozone/ozonefs-common/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneClientUtils.java
index f977577,a9d4f98..561cc34
--- a/hadoop-ozone/ozonefs-common/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneClientUtils.java
+++ b/hadoop-ozone/ozonefs-common/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneClientUtils.java
@@@ -37,16 -29,9 +37,16 @@@ import static org.junit.Assert.assertNu
  import static org.mockito.Mockito.mock;
  
  /**
 - * Unit tests for OzoneClientUtils.
 + * Tests the behavior of OzoneClientUtils APIs.
   */
  public class TestOzoneClientUtils {
 +  private ReplicationConfig ecReplicationConfig =
 +      new ECReplicationConfig("rs-3-2-1024K");
 +  private ReplicationConfig ratis3ReplicationConfig =
-       new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++      RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
 +  private ReplicationConfig ratis1ReplicationConfig =
-       new RatisReplicationConfig(HddsProtos.ReplicationFactor.ONE);
++      RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE);
 +
    @Test(expected = IllegalArgumentException.class)
    public void testNegativeLength() throws IOException {
      OzoneVolume volume = mock(OzoneVolume.class);
diff --cc hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 96bfe73,145578b..99a08d6
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@@ -174,27 -169,22 +174,28 @@@ public class ObjectEndpoint extends End
            partNumber, uploadID, body);
      }
  
+     String copyHeader = null, storageType = null;
      try {
-       String copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
-       String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
+       copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
+       storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
  
 -      S3StorageType s3StorageType;
 -      boolean storageTypeDefault;
 +      // Normal put object
 +      OzoneBucket bucket = getBucket(bucketName);
 +      ReplicationConfig clientConfiguredReplicationConfig = null;
 +      String replication = ozoneConfiguration.get(OZONE_REPLICATION);
 +      if (replication != null) {
 +        clientConfiguredReplicationConfig = ReplicationConfig.parse(
 +            ReplicationType.valueOf(ozoneConfiguration
 +                .get(OZONE_REPLICATION_TYPE, OZONE_REPLICATION_TYPE_DEFAULT)),
 +            replication, ozoneConfiguration);
 +      }
 +      ReplicationConfig replicationConfig = S3Utils
 +          .resolveS3ClientSideReplicationConfig(storageType,
 +              clientConfiguredReplicationConfig, bucket.getReplicationConfig());
 +      boolean storageTypeDefault = false;
        if (storageType == null || storageType.equals("")) {
 -        s3StorageType = S3StorageType.getDefault(ozoneConfiguration);
          storageTypeDefault = true;
 -      } else {
 -        s3StorageType = toS3StorageType(storageType);
 -        storageTypeDefault = false;
        }
 -      ReplicationType replicationType = s3StorageType.getType();
 -      ReplicationFactor replicationFactor = s3StorageType.getFactor();
  
        if (copyHeader != null) {
          //Copy object, as copy source available.
diff --cc hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index 0f42b3d,ad12e65..fcddfde
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@@ -80,7 -79,8 +80,7 @@@ public class OzoneBucketStub extends Oz
        long creationTime) {
      super(volumeName,
          bucketName,
-         new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE),
 -        StandaloneReplicationConfig
 -            .getInstance(HddsProtos.ReplicationFactor.ONE),
++        RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
          storageType,
          versioning,
          creationTime);
diff --cc hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestS3Utils.java
index 55f2f38,0000000..6b8c9fc
mode 100644,000000..100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestS3Utils.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestS3Utils.java
@@@ -1,143 -1,0 +1,143 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.hadoop.ozone.s3.util;
 +
 +import org.apache.hadoop.hdds.client.ECReplicationConfig;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 +import org.apache.hadoop.ozone.s3.exception.OS3Exception;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +/**
 + * Tests the S3Utils APIs.
 + */
 +public class TestS3Utils {
 +  private ReplicationConfig ecReplicationConfig =
 +      new ECReplicationConfig("rs-3-2-1024K");
 +  private ReplicationConfig ratis3ReplicationConfig =
-       new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++      RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
 +  private ReplicationConfig ratis1ReplicationConfig =
-       new RatisReplicationConfig(HddsProtos.ReplicationFactor.ONE);
++      RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE);
 +
 +  @Test
 +  public void testResolveClientSideRepConfigWhenBucketHasEC()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig = S3Utils
 +        .resolveS3ClientSideReplicationConfig(S3StorageType.STANDARD.name(),
 +            null, ecReplicationConfig);
 +    // Bucket default is EC.
 +    Assert.assertEquals(ecReplicationConfig, replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket replication is null and it should respect user passed value.
 +   */
 +  @Test
 +  public void testResolveClientSideRepConfigWhenBucketHasNull()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig = S3Utils
 +        .resolveS3ClientSideReplicationConfig(S3StorageType.STANDARD.name(),
 +            null, null);
 +    // Passed replication is 3 - Ozone mapped replication is ratis THREE
 +    Assert.assertEquals(ratis3ReplicationConfig, replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket replication is null and it should return null if user passed
 +   * value is invalid.
 +   */
 +  @Test
 +  public void testResolveClientSideRepConfigWhenUserPassedReplicationIsEmpty()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig =
 +        S3Utils.resolveS3ClientSideReplicationConfig("", null, null);
 +    // client configured value also null.
 +    // This API caller should leave the decision to server.
 +    Assert.assertNull(replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket default is non-EC and client side values are not valid, we
 +   * would just return null, so servers can make decision in this case.
 +   */
 +  @Test
 +  public void testResolveRepConfWhenUserPassedIsInvalidButBucketDefaultNonEC()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig = S3Utils
 +        .resolveS3ClientSideReplicationConfig(null, null,
 +            ratis3ReplicationConfig);
 +    // Configured client config also null.
 +    Assert.assertNull(replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket default is non-EC and client side value is valid, we
 +   * would should return client side valid value.
 +   */
 +  @Test
 +  public void testResolveRepConfWhenUserPassedIsValidButBucketDefaultNonEC()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig = S3Utils
 +        .resolveS3ClientSideReplicationConfig(
 +            S3StorageType.REDUCED_REDUNDANCY.name(), null,
 +            ratis3ReplicationConfig);
 +    // Passed value is replication one - Ozone mapped value is ratis ONE
 +    Assert.assertEquals(ratis1ReplicationConfig, replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket default is EC and client side value also valid, we would just
 +   * return bucket default EC.
 +   */
 +  @Test
 +  public void testResolveRepConfWhenUserPassedIsValidButBucketDefaultEC()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig = S3Utils
 +        .resolveS3ClientSideReplicationConfig(S3StorageType.STANDARD.name(),
 +            ratis3ReplicationConfig, ecReplicationConfig);
 +    // Bucket default is EC
 +    Assert.assertEquals(ecReplicationConfig, replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket default is non-EC and client side passed value also not valid
 +   * but configured value is valid, we would just return configured value.
 +   */
 +  @Test
 +  public void testResolveRepConfWhenUserPassedIsInvalidAndBucketDefaultNonEC()
 +      throws OS3Exception {
 +    ReplicationConfig replicationConfig = S3Utils
 +        .resolveS3ClientSideReplicationConfig(null, ratis3ReplicationConfig,
 +            ratis1ReplicationConfig);
 +    // Configured value is ratis THREE
 +    Assert.assertEquals(ratis3ReplicationConfig, replicationConfig);
 +  }
 +
 +  /**
 +   * When bucket default is non-EC and client side passed value also not valid
 +   * but configured value is valid, we would just return configured value.
 +   */
 +  @Test(expected = OS3Exception.class)
 +  public void testResolveRepConfWhenUserPassedIsInvalid() throws OS3Exception {
 +    S3Utils.resolveS3ClientSideReplicationConfig("INVALID",
 +        ratis3ReplicationConfig, ratis1ReplicationConfig);
 +  }
 +
 +}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org