You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2020/11/30 13:52:00 UTC

[ozone] 01/01: Merge branch 'master' into HDDS-2823

This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit f30bc4ee66681fe9b554cd322be0cedf29d8cde7
Merge: 44a6503 130ba4d
Author: Nandakumar <na...@apache.org>
AuthorDate: Mon Nov 30 16:55:07 2020 +0530

    Merge branch 'master' into HDDS-2823

 .asf.yaml                                          |   2 +-
 README.md                                          |  22 +-
 hadoop-hdds/client/pom.xml                         |   5 -
 .../apache/hadoop/hdds/scm/OzoneClientConfig.java  | 212 +++++++++
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |  12 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |  66 ++-
 .../hadoop/hdds/scm/storage/CommitWatcher.java     |   2 +-
 .../storage/TestBlockOutputStreamCorrectness.java  |  17 +-
 .../hdds/scm/storage/TestChunkInputStream.java     |  11 +-
 .../hadoop/hdds/protocol/DatanodeDetails.java      |   5 +-
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |  12 +-
 .../java/org/apache/hadoop/hdds/scm/ScmConfig.java |  43 ++
 .../hdds/scm/storage/ContainerProtocolCalls.java   |   4 +-
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |  73 +--
 .../org/apache/hadoop/ozone/common/Checksum.java   |  22 +-
 .../common/src/main/resources/ozone-default.xml    |  91 +---
 .../ratis/TestContainerCommandRequestMessage.java  |  13 +-
 .../hadoop/ozone/audit/TestOzoneAuditLogger.java   |   8 +-
 .../common/src/test/resources/auditlog.properties  |   2 +-
 .../org/apache/hadoop/hdds/conf/ConfigTag.java     |   1 +
 .../hdds/conf/ConfigurationReflectionUtil.java     |  14 +-
 .../hadoop/hdds/conf/ConfigurationTarget.java      |   4 +
 hadoop-hdds/container-service/pom.xml              |  11 +-
 .../common/statemachine/DatanodeConfiguration.java |  24 +
 .../common/statemachine/DatanodeStateMachine.java  |   6 +
 .../common/statemachine/EndpointStateMachine.java  |  16 +
 .../ClosePipelineCommandHandler.java               |  15 +-
 .../CreatePipelineCommandHandler.java              |  30 +-
 .../states/datanode/RunningDatanodeState.java      |   8 +-
 .../server/ratis/ContainerStateMachine.java        |   4 +-
 .../transport/server/ratis/XceiverServerRatis.java |  37 +-
 .../container/common/utils/ContainerCache.java     |  52 ++-
 .../common/utils/ContainerCacheMetrics.java        | 114 +++++
 .../ozone/container/common/volume/HddsVolume.java  |  12 +-
 .../container/common/volume/VolumeIOStats.java     |  72 ++-
 .../container/keyvalue/KeyValueContainer.java      |   3 +
 .../container/keyvalue/helpers/BlockUtils.java     |  54 +++
 .../keyvalue/helpers/KeyValueContainerUtil.java    | 112 +++--
 .../background/BlockDeletingService.java           |  19 +-
 .../container/metadata/AbstractDatanodeStore.java  |  69 ++-
 .../metadata/DatanodeStoreSchemaOneImpl.java       |   7 +-
 .../metadata/DatanodeStoreSchemaTwoImpl.java       |   7 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  12 +-
 .../replication/DownloadAndImportReplicator.java   |  12 +-
 .../container/replication/GrpcOutputStream.java    |   2 +-
 .../replication/GrpcReplicationClient.java         |  23 +-
 .../replication/SimpleContainerDownloader.java     |  36 +-
 .../protocol/commands/CreatePipelineCommand.java   |  16 +
 .../main/resources/webapps/hddsDatanode/index.html |   2 +-
 .../ozone/container/ContainerTestHelper.java       |   6 +-
 .../container/common/TestBlockDeletingService.java | 319 +++++++------
 .../ozone/container/common/TestContainerCache.java |  10 +-
 .../TestSchemaOneBackwardsCompatibility.java       |  83 ++--
 .../common/impl/TestContainerPersistence.java      |  53 +--
 .../TestCreatePipelineCommandHandler.java          |  13 +-
 .../container/keyvalue/TestKeyValueContainer.java  |  53 ++-
 .../container/ozoneimpl/TestContainerReader.java   |  13 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |   2 +
 .../replication/TestReplicationSupervisor.java     |  35 ++
 .../replication/TestSimpleContainerDownloader.java |  85 ++++
 hadoop-hdds/dev-support/checkstyle/checkstyle.xml  |   4 +-
 hadoop-hdds/docs/content/concept/Containers.zh.md  |  47 ++
 hadoop-hdds/docs/content/concept/OzoneManager.md   |   4 +-
 .../docs/content/concept/OzoneManager.zh.md        |  92 +++-
 hadoop-hdds/docs/content/concept/Recon.md          | 163 +++++++
 .../docs/content/concept/ReconHighLevelDesign.png  | Bin 0 -> 239168 bytes
 hadoop-hdds/docs/content/concept/ReconOmDesign.png | Bin 0 -> 162797 bytes
 .../docs/content/concept/ReconScmDesign.png        | Bin 0 -> 181628 bytes
 .../content/concept/StorageContainerManager.md     |   5 +-
 .../content/concept/StorageContainerManager.zh.md  |  69 ++-
 hadoop-hdds/docs/content/feature/GDPR.md           |   2 +-
 hadoop-hdds/docs/content/feature/GDPR.zh.md        |   5 +-
 hadoop-hdds/docs/content/feature/HA.md             |   4 +-
 hadoop-hdds/docs/content/feature/HA.zh.md          | 117 +++++
 hadoop-hdds/docs/content/feature/Quota.md          |  79 ++++
 hadoop-hdds/docs/content/feature/Quota.zh.md       |  74 +++
 hadoop-hdds/docs/content/feature/Recon.md          |  26 +-
 hadoop-hdds/docs/content/feature/Recon.zh.md       |  49 ++
 hadoop-hdds/docs/content/feature/_index.zh.md      |   7 +-
 hadoop-hdds/docs/content/interface/ReconApi.md     | 511 +++++++++++++++++++++
 hadoop-hdds/docs/content/interface/_index.zh.md    |   2 +-
 hadoop-hdds/docs/content/tools/Admin.md            |   4 +-
 hadoop-hdds/framework/pom.xml                      |  20 +-
 .../hdds/conf/DatanodeRatisServerConfig.java       |  19 +-
 .../hadoop/hdds/utils/db/DBStoreBuilder.java       | 358 ++++++++-------
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |  14 +-
 .../hadoop/hdds/utils/db/TestDBStoreBuilder.java   |  21 +-
 hadoop-hdds/hadoop-dependency-server/pom.xml       |   6 +
 hadoop-hdds/hadoop-dependency-test/pom.xml         |  12 +
 hadoop-hdds/pom.xml                                |  53 ++-
 hadoop-hdds/server-scm/pom.xml                     |   6 +
 .../apache/hadoop/hdds/scm/block/BlockManager.java |   4 +-
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |  50 +-
 .../block/DatanodeDeletedBlockTransactions.java    | 122 ++---
 .../hadoop/hdds/scm/block/DeletedBlockLog.java     |   4 +-
 .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java |  53 ++-
 .../hdds/scm/block/SCMBlockDeletingService.java    | 135 +++---
 .../IncrementalContainerReportHandler.java         |   4 +
 .../hdds/scm/container/ReplicationManager.java     |   4 +-
 .../hadoop/hdds/scm/ha/SCMHAManagerImpl.java       |   9 +-
 .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java     |   8 +-
 .../hadoop/hdds/scm/metadata/PipelineCodec.java    |   3 +-
 .../hdds/scm/pipeline/PipelineReportHandler.java   |   8 +-
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |   4 +-
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  39 +-
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |   3 +-
 .../safemode/OneReplicaPipelineSafeModeRule.java   |  45 +-
 .../hdds/scm/server/SCMBlockProtocolServer.java    |  85 ++--
 .../hdds/scm/server/StorageContainerManager.java   |   9 -
 .../hdds/scm/server/ratis/SCMRatisServer.java      |  10 +-
 .../hdds/scm/server/ratis/SCMStateMachine.java     |   1 -
 .../src/main/resources/webapps/scm/index.html      |   6 +-
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java |  97 ++--
 .../hdds/scm/container/TestReplicationManager.java |  12 +-
 .../hadoop/hdds/scm/ha/MockSCMHAManager.java       |  26 +-
 .../hadoop/hdds/scm/ha/TestSCMRatisResponse.java   |  28 +-
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |   4 +-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |   6 +
 .../TestOneReplicaPipelineSafeModeRule.java        |  71 ++-
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |  25 +
 .../apache/hadoop/ozone/client/OzoneVolume.java    |  17 -
 .../ozone/client/io/BlockOutputStreamEntry.java    | 131 +-----
 .../client/io/BlockOutputStreamEntryPool.java      |  89 +---
 .../hadoop/ozone/client/io/KeyOutputStream.java    | 115 ++---
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  | 109 +----
 hadoop-ozone/common/pom.xml                        |   5 +
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |  98 +++-
 .../ozone/om/ha/OMFailoverProxyProvider.java       |   2 +-
 .../hadoop/ozone/om/helpers/OmBucketInfo.java      |  22 +-
 .../hadoop/ozone/om/helpers/OmVolumeArgs.java      |  33 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |   4 +-
 .../hadoop/ozone/security/acl/OzoneObjInfo.java    |  10 +
 .../hadoop/ozone/security/acl/RequestContext.java  |  48 +-
 .../org/apache/hadoop/ozone/util/ExitManager.java  |   2 +-
 .../java/org/apache/hadoop/ozone/TestOmUtils.java  |   5 +
 .../hadoop/ozone/om/lock/TestOzoneManagerLock.java |  52 +--
 hadoop-ozone/datanode/pom.xml                      |  25 +-
 hadoop-ozone/dist/pom.xml                          |  48 +-
 .../docker-image/docker-krb5/Dockerfile-krb5       |   2 +-
 .../compose/ozone-mr/hadoop27/docker-compose.yaml  |   2 +-
 .../compose/ozone-mr/hadoop31/docker-compose.yaml  |   2 +-
 .../dist/src/main/compose/ozone-mr/hadoop32/.env   |   6 +-
 .../compose/ozone-mr/hadoop32/docker-compose.yaml  |   2 +-
 .../dist/src/main/compose/ozonesecure-mr/.env      |   3 +-
 .../compose/ozonesecure-mr/docker-compose.yaml     |  12 +-
 .../dist/src/main/compose/ozonesecure-mr/test.sh   |   1 +
 .../compose/ozonesecure-om-ha/docker-compose.yaml  |   4 +-
 .../main/compose/ozonesecure/docker-compose.yaml   |   2 +
 .../main/smoketest/security/ozone-secure-fs.robot  |  19 +-
 hadoop-ozone/dist/src/shell/ozone/ozone            |  11 +-
 hadoop-ozone/dist/src/shell/ozone/stop-ozone.sh    |   5 +
 .../fault-injection-test/mini-chaos-tests/pom.xml  |   6 +
 .../mini-chaos-tests/src/test/bin/start-chaos.sh   |   2 +-
 .../apache/hadoop/ozone/MiniOzoneChaosCluster.java |  31 +-
 .../hadoop/ozone/MiniOzoneLoadGenerator.java       |   8 +-
 .../apache/hadoop/ozone/OzoneChaosCluster.java}    |  45 +-
 .../hadoop/ozone/TestAllMiniChaosOzoneCluster.java |  55 +++
 .../ozone/TestDatanodeMiniChaosOzoneCluster.java   |  53 +++
 .../hadoop/ozone/TestMiniChaosOzoneCluster.java    | 100 ++--
 .../TestOzoneManagerMiniChaosOzoneCluster.java     |  57 +++
 .../hadoop/ozone/failure/FailureManager.java       |   3 +-
 .../org/apache/hadoop/ozone/failure/Failures.java  |  13 +
 .../ozone/loadgenerators/AgedDirLoadGenerator.java |   1 -
 .../ozone/loadgenerators/AgedLoadGenerator.java    |   6 +-
 .../loadgenerators/FilesystemLoadGenerator.java    |   1 -
 .../{utils => loadgenerators}/LoadBucket.java      |   2 +-
 .../hadoop/ozone/loadgenerators/LoadGenerator.java |  17 +
 .../loadgenerators/NestedDirLoadGenerator.java     |   1 -
 .../loadgenerators/RandomDirLoadGenerator.java     |   1 -
 .../ozone/loadgenerators/RandomLoadGenerator.java  |   1 -
 .../loadgenerators/ReadOnlyLoadGenerator.java      |   1 -
 .../src/test/resources/log4j.properties            |   2 +-
 hadoop-ozone/integration-test/pom.xml              |   6 +
 .../fs/ozone/TestOzoneFSWithObjectStoreCreate.java |  71 ++-
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |  13 +-
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java |  85 +++-
 .../hadoop/hdds/scm/pipeline/TestSCMRestart.java   |   6 +
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  49 +-
 .../org/apache/hadoop/ozone/RatisTestHelper.java   |   3 +-
 .../hadoop/ozone/TestStorageContainerManager.java  |  18 +-
 .../ozone/client/rpc/TestBlockOutputStream.java    |  32 +-
 .../rpc/TestBlockOutputStreamFlushDelay.java       |  22 +-
 .../rpc/TestBlockOutputStreamWithFailures.java     |  38 +-
 ...estBlockOutputStreamWithFailuresFlushDelay.java |  34 +-
 .../rpc/TestCloseContainerHandlingByClient.java    |  35 +-
 .../hadoop/ozone/client/rpc/TestCommitWatcher.java |   7 +-
 .../client/rpc/TestContainerStateMachine.java      |  35 +-
 .../rpc/TestContainerStateMachineFailures.java     |  64 ++-
 .../client/rpc/TestDeleteWithSlowFollower.java     |  13 +-
 .../client/rpc/TestDiscardPreallocatedBlocks.java  |  35 +-
 .../client/rpc/TestFailureHandlingByClient.java    |   7 +-
 .../ozone/client/rpc/TestKeyInputStream.java       |  33 +-
 .../rpc/TestOzoneClientRetriesOnException.java     |  40 +-
 ...estOzoneClientRetriesOnExceptionFlushDelay.java |  31 +-
 .../ozone/client/rpc/TestOzoneRpcClient.java       |   5 +-
 .../client/rpc/TestOzoneRpcClientAbstract.java     | 319 +------------
 .../client/rpc/TestValidateBCSIDOnRestart.java     |  41 +-
 .../ozone/client/rpc/TestWatchForCommit.java       |  46 +-
 .../commandhandler/TestBlockDeletion.java          |  17 +-
 .../container/metrics/TestContainerMetrics.java    |  49 +-
 .../org/apache/hadoop/ozone/om/TestOmAcls.java     |   9 +-
 .../org/apache/hadoop/ozone/om/TestOmMetrics.java  | 105 +++++
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |  47 ++
 .../ozone/om/TestOzoneManagerHAKeyDeletion.java    |  77 ++++
 .../ozone/om/TestOzoneManagerHAWithData.java       |  37 --
 .../ozone/om/TestOzoneManagerListVolumes.java      |  36 +-
 .../hadoop/ozone/om/TestOzoneManagerRestart.java   | 100 +++-
 .../hadoop/ozone/shell/TestOzoneShellHA.java       |  42 ++
 .../src/test/resources/contract/ozone.xml          |   5 +
 .../src/main/proto/OmClientProtocol.proto          |   4 +-
 hadoop-ozone/interface-storage/pom.xml             |  14 +-
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |  11 +-
 .../hadoop/ozone/om/codec/UserVolumeInfoCodec.java |  18 +-
 .../hadoop/ozone/om/ratis/OMTransactionInfo.java   |   6 +-
 .../src/main/proto/OmStorageProtocol.proto         |   6 +
 hadoop-ozone/ozone-manager/pom.xml                 |   6 +
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  |  32 +-
 .../apache/hadoop/ozone/om/KeyDeletingService.java |  19 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  85 ++--
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  37 ++
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  35 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 180 ++++++--
 .../apache/hadoop/ozone/om/PrefixManagerImpl.java  |  31 +-
 .../hadoop/ozone/om/S3SecretManagerImpl.java       |   8 +-
 .../apache/hadoop/ozone/om/TrashPolicyOzone.java   | 235 ++++++++++
 .../apache/hadoop/ozone/om/VolumeManagerImpl.java  |  79 ++--
 .../hadoop/ozone/om/codec/OMDBDefinition.java      |   6 +-
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   |  29 +-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |  50 +-
 .../ozone/om/ratis/OzoneManagerStateMachine.java   |   2 +-
 .../hadoop/ozone/om/request/OMClientRequest.java   |  46 +-
 .../om/request/bucket/OMBucketCreateRequest.java   |   4 +-
 .../request/bucket/OMBucketSetPropertyRequest.java |  15 +-
 .../om/request/bucket/acl/OMBucketAclRequest.java  |  15 +
 .../request/bucket/acl/OMBucketAddAclRequest.java  |  23 +
 .../bucket/acl/OMBucketRemoveAclRequest.java       |  24 +
 .../request/bucket/acl/OMBucketSetAclRequest.java  |  24 +
 .../om/request/file/OMDirectoryCreateRequest.java  |  23 +-
 .../ozone/om/request/file/OMFileCreateRequest.java |   9 +-
 .../ozone/om/request/file/OMFileRequest.java       |  30 --
 .../om/request/key/OMAllocateBlockRequest.java     |  16 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |  14 +-
 .../ozone/om/request/key/OMKeyCreateRequest.java   |   8 +-
 .../ozone/om/request/key/OMKeyDeleteRequest.java   |   7 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |  75 +--
 .../ozone/om/request/key/OMKeysDeleteRequest.java  |  11 +-
 .../ozone/om/request/key/OMKeysRenameRequest.java  |   9 +-
 .../ozone/om/request/key/acl/OMKeyAclRequest.java  |  15 +
 .../om/request/key/acl/OMKeyAddAclRequest.java     |  24 +
 .../om/request/key/acl/OMKeyRemoveAclRequest.java  |  24 +
 .../om/request/key/acl/OMKeySetAclRequest.java     |  24 +
 .../request/key/acl/prefix/OMPrefixAclRequest.java |   7 +
 .../S3InitiateMultipartUploadRequest.java          |   3 +-
 .../multipart/S3MultipartUploadAbortRequest.java   |   5 +-
 .../S3MultipartUploadCommitPartRequest.java        |  10 +-
 .../hadoop/ozone/om/request/util/ObjectParser.java |   6 +-
 .../om/request/volume/OMVolumeCreateRequest.java   |   8 +-
 .../om/request/volume/OMVolumeDeleteRequest.java   |   4 +-
 .../ozone/om/request/volume/OMVolumeRequest.java   |  24 +-
 .../om/request/volume/OMVolumeSetOwnerRequest.java |  13 +-
 .../om/request/volume/OMVolumeSetQuotaRequest.java |   6 +-
 .../om/request/volume/acl/OMVolumeAclRequest.java  |  14 +
 .../request/volume/acl/OMVolumeAddAclRequest.java  |  24 +
 .../volume/acl/OMVolumeRemoveAclRequest.java       |  24 +
 .../request/volume/acl/OMVolumeSetAclRequest.java  |  24 +
 .../om/response/file/OMFileCreateResponse.java     |   5 +
 .../om/response/key/OMAllocateBlockResponse.java   |   4 -
 .../ozone/om/response/key/OMKeyCommitResponse.java |   4 -
 .../ozone/om/response/key/OMKeyCreateResponse.java |   7 +-
 .../ozone/om/response/key/OMKeyDeleteResponse.java |   4 -
 .../om/response/key/OMKeysDeleteResponse.java      |   4 -
 .../multipart/S3MultipartUploadAbortResponse.java  |   4 -
 .../S3MultipartUploadCommitPartResponse.java       |   5 +-
 .../om/response/volume/OMVolumeCreateResponse.java |   7 +-
 .../om/response/volume/OMVolumeDeleteResponse.java |  10 +-
 .../response/volume/OMVolumeSetOwnerResponse.java  |  13 +-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |   9 +-
 .../ozone/security/acl/OzoneNativeAuthorizer.java  |  44 +-
 .../apache/hadoop/ozone/om/TestKeyManagerUnit.java |  75 ++-
 .../om/ratis/TestOzoneManagerStateMachine.java     |  10 +-
 .../ozone/om/request/TestOMRequestUtils.java       |  68 ++-
 .../bucket/TestOMBucketSetPropertyRequest.java     |   3 +-
 .../bucket/acl/TestOMBucketAddAclRequest.java      | 119 +++++
 .../bucket/acl/TestOMBucketRemoveAclRequest.java   | 137 ++++++
 .../bucket/acl/TestOMBucketSetAclRequest.java      | 125 +++++
 .../ozone/om/request/bucket/acl/package-info.java} |  30 +-
 .../ozone/om/request/key/TestOMKeyAclRequest.java  | 155 ++++++-
 .../request/volume/TestOMVolumeCreateRequest.java  |  10 +-
 .../volume/TestOMVolumeSetOwnerRequest.java        |   8 +-
 .../volume/TestOMVolumeSetQuotaRequest.java        |   3 +-
 .../volume/acl/TestOMVolumeAddAclRequest.java      |   7 +
 .../volume/acl/TestOMVolumeRemoveAclRequest.java   |   8 +
 .../volume/acl/TestOMVolumeSetAclRequest.java      |   7 +
 .../ozone/om/response/TestCleanupTableInfo.java    | 328 ++++++++++++-
 .../volume/TestOMVolumeCreateResponse.java         |   6 +-
 .../volume/TestOMVolumeDeleteResponse.java         |   8 +-
 .../volume/TestOMVolumeSetOwnerResponse.java       |  11 +-
 .../hadoop/ozone/security/acl/TestVolumeOwner.java | 298 ++++++++++++
 .../hadoop/fs/ozone/BasicOzoneFileSystem.java      |  26 +-
 .../fs/ozone/BasicRootedOzoneFileSystem.java       |  28 +-
 hadoop-ozone/pom.xml                               |  57 ++-
 .../ozone/recon/codegen/ReconSqlDbConfig.java      |   2 +-
 .../recon/schema/ReconTaskSchemaDefinition.java    |   2 +-
 hadoop-ozone/recon/pom.xml                         |   8 +-
 .../persistence/DefaultDataSourceProvider.java     |   6 +
 .../recon/persistence/DerbyDataSourceProvider.java |   1 -
 .../ozone/recon/tasks/FileSizeCountTask.java       |   4 +-
 .../hadoop/ozone/recon/tasks/ReconTaskConfig.java  |   4 +-
 .../persistence/TestReconWithDifferentSqlDBs.java  |   5 +
 .../recon/tasks/TestContainerKeyMapperTask.java    |   2 +-
 .../ozone/recon/tasks/TestFileSizeCountTask.java   |   6 +
 .../hadoop/ozone/s3/AWSSignatureProcessor.java     |  43 +-
 .../hadoop/ozone/s3/OzoneClientProducer.java       |  61 ++-
 .../apache/hadoop/ozone/s3/SignatureProcessor.java |   2 +
 .../hadoop/ozone/s3/endpoint/BucketEndpoint.java   |   8 +-
 .../ozone/s3/endpoint/ListObjectResponse.java      |  11 +
 .../hadoop/ozone/s3/exception/S3ErrorTable.java    |   5 +
 .../hadoop/ozone/s3/TestOzoneClientProducer.java   |  25 +-
 .../hadoop/ozone/s3/endpoint/TestBucketPut.java    |   5 +
 .../hadoop/ozone/s3/endpoint/TestRootList.java     |   5 +
 hadoop-ozone/tools/pom.xml                         |   2 +-
 .../hadoop/ozone/freon/DatanodeBlockPutter.java    |   7 +-
 .../freon/FollowerAppendLogEntryGenerator.java     |  11 +-
 .../ozone/freon/LeaderAppendLogEntryGenerator.java |  20 +-
 .../hadoop/ozone/shell/SetSpaceQuotaOptions.java   |   3 +-
 .../java/org/apache/hadoop/ozone/shell/Shell.java  |  24 +-
 pom.xml                                            |  18 +-
 327 files changed, 7924 insertions(+), 3158 deletions(-)

diff --cc hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
index 39d481e,3d1d689..39dcaba
--- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
+++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
@@@ -43,5 -43,5 +43,6 @@@ public enum ConfigTag 
    S3GATEWAY,
    DATANODE,
    RECON,
 -  DELETION
++  DELETION,
 +  HA
  }
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 06d34ed,014c76c..8c23237
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@@ -53,12 -56,8 +56,10 @@@ import org.apache.hadoop.ozone.common.B
  import org.apache.hadoop.util.StringUtils;
  
  import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.INVALID_BLOCK_SIZE;
- import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
- import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
  import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
  import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
 +
 +import org.apache.ratis.protocol.exceptions.NotLeaderException;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 5d43a75,aa55480..2fe558f
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@@ -323,27 -322,49 +322,49 @@@ public class DeletedBlockLogImp
    public void close() throws IOException {
    }
  
+   private void getTransaction(DeletedBlocksTransaction tx,
+       DatanodeDeletedBlockTransactions transactions) {
+     try {
+       Set<ContainerReplica> replicas = containerManager
 -          .getContainerReplicas(ContainerID.valueof(tx.getContainerID()));
++          .getContainerReplicas(ContainerID.valueOf(tx.getContainerID()));
+       for (ContainerReplica replica : replicas) {
+         UUID dnID = replica.getDatanodeDetails().getUuid();
+         Set<UUID> dnsWithTransactionCommitted =
+             transactionToDNsCommitMap.get(tx.getTxID());
+         if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted
+             .contains(dnID)) {
+           // Transaction need not be sent to dns which have
+           // already committed it
+           transactions.addTransactionToDN(dnID, tx);
+         }
+       }
+     } catch (IOException e) {
+       LOG.warn("Got container info error.", e);
+     }
+   }
+ 
    @Override
-   public Map<Long, Long> getTransactions(
-       DatanodeDeletedBlockTransactions transactions) throws IOException {
+   public DatanodeDeletedBlockTransactions getTransactions(
+       int blockDeletionLimit) throws IOException {
      lock.lock();
      try {
-       Map<Long, Long> deleteTransactionMap = new HashMap<>();
+       DatanodeDeletedBlockTransactions transactions =
+           new DatanodeDeletedBlockTransactions();
        try (TableIterator<Long,
            ? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
                 scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
-         while (iter.hasNext()) {
+         int numBlocksAdded = 0;
+         while (iter.hasNext() && numBlocksAdded < blockDeletionLimit) {
            Table.KeyValue<Long, DeletedBlocksTransaction> keyValue =
                iter.next();
-           DeletedBlocksTransaction block = keyValue.getValue();
-           if (block.getCount() > -1 && block.getCount() <= maxRetry) {
-             if (transactions.addTransaction(block,
-                 transactionToDNsCommitMap.get(block.getTxID()))) {
-               deleteTransactionMap.put(block.getContainerID(),
-                   block.getTxID());
-               transactionToDNsCommitMap
-                   .putIfAbsent(block.getTxID(), new LinkedHashSet<>());
-             }
+           DeletedBlocksTransaction txn = keyValue.getValue();
 -          final ContainerID id = ContainerID.valueof(txn.getContainerID());
++          final ContainerID id = ContainerID.valueOf(txn.getContainerID());
+           if (txn.getCount() > -1 && txn.getCount() <= maxRetry
+               && !containerManager.getContainer(id).isOpen()) {
+             numBlocksAdded += txn.getLocalIDCount();
+             getTransaction(txn, transactions);
+             transactionToDNsCommitMap
+                 .putIfAbsent(txn.getTxID(), new LinkedHashSet<>());
            }
          }
        }
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index c86c7d9,0000000..33f408d
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@@ -1,170 -1,0 +1,177 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements. See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership. The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * <p>
 + * <p>http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * <p>Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +
 +package org.apache.hadoop.hdds.scm.ha;
 +
 +import com.google.common.base.Preconditions;
 +import java.util.List;
 +import java.util.stream.Collectors;
 +import org.apache.hadoop.hdds.conf.ConfigurationSource;
 +import org.apache.ratis.protocol.RaftGroupMemberId;
 +import org.apache.ratis.protocol.RaftPeer;
 +import org.apache.ratis.protocol.RaftPeerId;
 +import org.apache.ratis.protocol.exceptions.NotLeaderException;
 +import org.apache.ratis.server.RaftServer;
 +import org.apache.ratis.server.impl.RaftServerImpl;
 +import org.apache.ratis.server.impl.RaftServerProxy;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +
 +/**
 + * SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
 + * node Ratis ring. The Ratis ring will have one Leader node and 2N follower
 + * nodes.
 + *
 + * TODO
 + *
 + */
 +public class SCMHAManagerImpl implements SCMHAManager {
 +
 +  private static final Logger LOG =
 +      LoggerFactory.getLogger(SCMHAManagerImpl.class);
 +
 +  private final SCMRatisServerImpl ratisServer;
 +  private final ConfigurationSource conf;
 +
 +  /**
 +   * Creates SCMHAManager instance.
 +   */
 +  public SCMHAManagerImpl(final ConfigurationSource conf) throws IOException {
 +    this.conf = conf;
 +    this.ratisServer = new SCMRatisServerImpl(
 +        conf.getObject(SCMHAConfiguration.class), conf);
 +  }
 +
 +  /**
 +   * {@inheritDoc}
 +   */
 +  @Override
 +  public void start() throws IOException {
 +    ratisServer.start();
 +  }
 +
 +  /**
 +   * {@inheritDoc}
 +   */
 +  @Override
 +  public boolean isLeader() {
 +    if (!SCMHAUtils.isSCMHAEnabled(conf)) {
 +      // When SCM HA is not enabled, the current SCM is always the leader.
 +      return true;
 +    }
 +    RaftServer server = ratisServer.getServer();
 +    Preconditions.checkState(server instanceof RaftServerProxy);
 +    RaftServerImpl serverImpl = null;
 +    try {
 +      // SCM only has one raft group.
 +      serverImpl = ((RaftServerProxy) server)
 +          .getImpl(ratisServer.getRaftGroupId());
 +      if (serverImpl != null) {
 +        // Only when it's sure the current SCM is the leader, otherwise
 +        // it should all return false.
 +        return serverImpl.isLeader();
 +      }
 +    } catch (IOException ioe) {
 +      LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
 +          "whether it's leader. ", ioe);
 +    }
 +
 +    return false;
 +  }
 +
 +  /**
 +   * {@inheritDoc}
 +   */
 +  @Override
 +  public SCMRatisServer getRatisServer() {
 +    return ratisServer;
 +  }
 +
 +  private RaftPeerId getPeerIdFromRoleInfo(RaftServerImpl serverImpl) {
++    /*
++      TODO: Fix Me
++              Ratis API has changed.
++              RaftServerImpl#getRoleInfoProto is no more public.
++
 +    if (serverImpl.isLeader()) {
 +      return RaftPeerId.getRaftPeerId(
 +          serverImpl.getRoleInfoProto().getLeaderInfo().toString());
 +    } else if (serverImpl.isFollower()) {
 +      return RaftPeerId.valueOf(
 +          serverImpl.getRoleInfoProto().getFollowerInfo()
 +              .getLeaderInfo().getId().getId());
 +    } else {
 +      return null;
 +    }
++     */
++    return null;
 +  }
 +
 +  @Override
 +  public RaftPeer getSuggestedLeader() {
 +    RaftServer server = ratisServer.getServer();
 +    Preconditions.checkState(server instanceof RaftServerProxy);
 +    RaftServerImpl serverImpl = null;
 +    try {
 +      // SCM only has one raft group.
 +      serverImpl = ((RaftServerProxy) server)
 +          .getImpl(ratisServer.getRaftGroupId());
 +      if (serverImpl != null) {
 +        RaftPeerId peerId =  getPeerIdFromRoleInfo(serverImpl);
 +        if (peerId != null) {
-           return new RaftPeer(peerId);
++          return RaftPeer.newBuilder().setId(peerId).build();
 +        }
 +        return null;
 +      }
 +    } catch (IOException ioe) {
 +      LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
 +          "whether it's leader. ", ioe);
 +    }
 +    return null;
 +  }
 +
 +  /**
 +   * {@inheritDoc}
 +   */
 +  @Override
 +  public void shutdown() throws IOException {
 +    ratisServer.stop();
 +  }
 +
 +  @Override
 +  public List<String> getRatisRoles() {
 +    return getRatisServer()
 +            .getRaftPeers()
 +            .stream()
 +            .map(peer -> peer.getAddress() == null ? "" : peer.getAddress())
 +            .collect(Collectors.toList());
 +  }
 +
 +  /**
 +   * {@inheritDoc}
 +   */
 +  @Override
 +  public NotLeaderException triggerNotLeaderException() {
 +    return new NotLeaderException(RaftGroupMemberId.valueOf(
 +        ratisServer.getServer().getId(),
 +        ratisServer.getRaftGroupId()),
 +        getSuggestedLeader(),
 +        ratisServer.getRaftPeers());
 +  }
 +}
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 8611b1f,0000000..ab766c9
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@@ -1,224 -1,0 +1,228 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership.  The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * <p/>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p/>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +
 +package org.apache.hadoop.hdds.scm.ha;
 +
 +import java.io.IOException;
 +import java.net.InetAddress;
 +import java.net.InetSocketAddress;
 +import java.nio.charset.StandardCharsets;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.UUID;
 +import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.stream.Collectors;
 +
 +import org.apache.hadoop.hdds.HddsUtils;
 +import org.apache.hadoop.hdds.conf.ConfigurationSource;
 +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
 +import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 +import org.apache.ratis.conf.RaftProperties;
 +import org.apache.ratis.protocol.ClientId;
 +import org.apache.ratis.protocol.RaftClientReply;
 +import org.apache.ratis.protocol.RaftClientRequest;
 +import org.apache.ratis.protocol.RaftGroup;
 +import org.apache.ratis.protocol.RaftGroupId;
 +import org.apache.ratis.protocol.RaftPeer;
 +import org.apache.ratis.protocol.RaftPeerId;
 +import org.apache.ratis.server.RaftServer;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +/**
 + * TODO.
 + */
 +public class SCMRatisServerImpl implements SCMRatisServer {
 +  private static final Logger LOG =
 +      LoggerFactory.getLogger(SCMRatisServerImpl.class);
 +
 +  private final InetSocketAddress address;
 +  private final RaftServer server;
 +  private final RaftGroupId raftGroupId;
 +  private final RaftGroup raftGroup;
 +  private final RaftPeerId raftPeerId;
 +  private final SCMStateMachine scmStateMachine;
 +  private final ClientId clientId = ClientId.randomId();
 +  private final AtomicLong callId = new AtomicLong();
 +
 +  // TODO: Refactor and remove ConfigurationSource and use only
 +  //  SCMHAConfiguration.
 +  SCMRatisServerImpl(final SCMHAConfiguration haConf,
 +                     final ConfigurationSource conf)
 +      throws IOException {
 +    this.address = haConf.getRatisBindAddress();
 +
 +    SCMHAGroupBuilder scmHAGroupBuilder = new SCMHAGroupBuilder(haConf, conf);
 +    this.raftPeerId = scmHAGroupBuilder.getPeerId();
 +    this.raftGroupId = scmHAGroupBuilder.getRaftGroupId();
 +    this.raftGroup = scmHAGroupBuilder.getRaftGroup();
 +
 +    final RaftProperties serverProperties = RatisUtil
 +        .newRaftProperties(haConf, conf);
 +    this.scmStateMachine = new SCMStateMachine();
 +    this.server = RaftServer.newBuilder()
 +        .setServerId(raftPeerId)
 +        .setGroup(raftGroup)
 +        .setProperties(serverProperties)
 +        .setStateMachine(scmStateMachine)
 +        .build();
 +  }
 +
 +  @Override
 +  public void start() throws IOException {
 +    server.start();
 +  }
 +
 +  @Override
 +  public void registerStateMachineHandler(final RequestType handlerType,
 +                                          final Object handler) {
 +    scmStateMachine.registerHandler(handlerType, handler);
 +  }
 +
 +  @Override
 +  public SCMRatisResponse submitRequest(SCMRatisRequest request)
 +      throws IOException, ExecutionException, InterruptedException {
 +    final RaftClientRequest raftClientRequest = new RaftClientRequest(
 +        clientId, server.getId(), raftGroupId, nextCallId(), request.encode(),
 +        RaftClientRequest.writeRequestType(), null);
 +    final RaftClientReply raftClientReply =
 +        server.submitClientRequestAsync(raftClientRequest).get();
 +    return SCMRatisResponse.decode(raftClientReply);
 +  }
 +
 +  private long nextCallId() {
 +    return callId.getAndIncrement() & Long.MAX_VALUE;
 +  }
 +
 +  @Override
 +  public void stop() throws IOException {
 +    server.close();
 +  }
 +
 +  @Override
 +  public RaftServer getServer() {
 +    return server;
 +  }
 +
 +  @Override
 +  public RaftGroupId getRaftGroupId() {
 +    return raftGroupId;
 +  }
 +
 +  @Override
 +  public List<RaftPeer> getRaftPeers() {
-     return Collections.singletonList(new RaftPeer(raftPeerId));
++    return Collections.singletonList(RaftPeer.newBuilder()
++        .setId(raftPeerId).build());
 +  }
 +
 +
 +  /**
 +   * If the SCM group starts from {@link ScmConfigKeys#OZONE_SCM_NAMES},
 +   * its raft peers should locate on different nodes, and use the same port
 +   * to communicate with each other.
 +   *
 +   * Each of the raft peer figures out its {@link RaftPeerId} by computing
 +   * its position in {@link ScmConfigKeys#OZONE_SCM_NAMES}.
 +   *
 +   * Assume {@link ScmConfigKeys#OZONE_SCM_NAMES} is "ip0,ip1,ip2",
 +   * scm with ip0 identifies its {@link RaftPeerId} as scm0,
 +   * scm with ip1 identifies its {@link RaftPeerId} as scm1,
 +   * scm with ip2 identifies its {@link RaftPeerId} as scm2.
 +   *
 +   * After startup, they will form a {@link RaftGroup} with groupID
 +   * "SCM-HA-Service", and communicate with each other via
 +   * ozone.scm.ha.ratis.bind.port.
 +   */
 +  private static class SCMHAGroupBuilder {
 +    private final static String SCM_SERVICE_ID = "SCM-HA-Service";
 +
 +    private final RaftGroupId raftGroupId;
 +    private final RaftGroup raftGroup;
 +    private RaftPeerId selfPeerId;
 +
 +    /**
 +     * @return raft group
 +     */
 +    public RaftGroup getRaftGroup() {
 +      return raftGroup;
 +    }
 +
 +    /**
 +     * @return raft group id
 +     */
 +    public RaftGroupId getRaftGroupId() {
 +      return raftGroupId;
 +    }
 +
 +    /**
 +     * @return raft peer id
 +     */
 +    public RaftPeerId getPeerId() {
 +      return selfPeerId;
 +    }
 +
 +    SCMHAGroupBuilder(final SCMHAConfiguration haConf,
 +                      final ConfigurationSource conf) throws IOException {
 +      // fetch port
 +      int port = haConf.getRatisBindAddress().getPort();
 +
 +      // fetch localhost
 +      InetAddress localHost = InetAddress.getLocalHost();
 +
 +      // fetch hosts from ozone.scm.names
 +      List<String> hosts =
 +          Arrays.stream(conf.getTrimmedStrings(ScmConfigKeys.OZONE_SCM_NAMES))
 +              .map(scmName -> HddsUtils.getHostName(scmName).get())
 +              .collect(Collectors.toList());
 +
 +      final List<RaftPeer> raftPeers = new ArrayList<>();
 +      for (int i = 0; i < hosts.size(); ++i) {
 +        String nodeId = "scm" + i;
 +        RaftPeerId peerId = RaftPeerId.getRaftPeerId(nodeId);
 +
 +        String host = hosts.get(i);
 +        if (InetAddress.getByName(host).equals(localHost)) {
 +          selfPeerId = peerId;
 +        }
 +
-         raftPeers.add(new RaftPeer(peerId, host + ":" + port));
++        raftPeers.add(RaftPeer.newBuilder()
++            .setId(peerId)
++            .setAddress(host + ":" + port)
++            .build());
 +      }
 +
 +      if (selfPeerId == null) {
 +        String errorMessage = "localhost " +  localHost
 +            + " does not exist in ozone.scm.names "
 +            + conf.get(ScmConfigKeys.OZONE_SCM_NAMES);
 +        throw new IOException(errorMessage);
 +      }
 +
 +      LOG.info("Build a RaftGroup for SCMHA, " +
 +              "localHost: {}, OZONE_SCM_NAMES: {}, selfPeerId: {}",
 +          localHost, conf.get(ScmConfigKeys.OZONE_SCM_NAMES), selfPeerId);
 +
 +      raftGroupId = RaftGroupId.valueOf(UUID.nameUUIDFromBytes(
 +          SCM_SERVICE_ID.getBytes(StandardCharsets.UTF_8)));
 +
 +      raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
 +    }
 +  }
 +}
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java
index 9ab8c66,0000000..3cb56a6
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java
@@@ -1,590 -1,0 +1,596 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + *  with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *  Unless required by applicable law or agreed to in writing, software
 + *  distributed under the License is distributed on an "AS IS" BASIS,
 + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  See the License for the specific language governing permissions and
 + *  limitations under the License.
 + */
 +
 +package org.apache.hadoop.hdds.scm.server.ratis;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.base.Strings;
 +import org.apache.hadoop.hdds.conf.Config;
 +import org.apache.hadoop.hdds.conf.ConfigGroup;
 +import org.apache.hadoop.hdds.conf.ConfigType;
 +import org.apache.hadoop.hdds.conf.ConfigurationSource;
 +import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 +import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
 +import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 +import org.apache.hadoop.hdds.server.ServerUtils;
 +import org.apache.ratis.RaftConfigKeys;
 +import org.apache.ratis.conf.RaftProperties;
 +import org.apache.ratis.grpc.GrpcConfigKeys;
 +import org.apache.ratis.netty.NettyConfigKeys;
 +import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 +import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
 +import org.apache.ratis.protocol.ClientId;
 +import org.apache.ratis.protocol.GroupInfoReply;
 +import org.apache.ratis.protocol.GroupInfoRequest;
 +import org.apache.ratis.protocol.RaftGroup;
 +import org.apache.ratis.protocol.RaftGroupId;
 +import org.apache.ratis.protocol.RaftPeer;
 +import org.apache.ratis.protocol.RaftPeerId;
 +import org.apache.ratis.rpc.RpcType;
 +import org.apache.ratis.rpc.SupportedRpcType;
 +import org.apache.ratis.server.RaftServer;
 +import org.apache.ratis.server.RaftServerConfigKeys;
 +import org.apache.ratis.server.protocol.TermIndex;
 +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 +import org.apache.ratis.util.LifeCycle;
 +import org.apache.ratis.util.SizeInBytes;
 +import org.apache.ratis.util.TimeDuration;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.net.InetSocketAddress;
 +import java.nio.charset.StandardCharsets;
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.List;
 +import java.util.Optional;
 +import java.util.UUID;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.ScheduledExecutorService;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.concurrent.locks.ReentrantReadWriteLock;
 +
 +import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
 +import static org.apache.hadoop.hdds.conf.ConfigTag.RATIS;
 +import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
 +import static org.apache.hadoop.hdds.conf.ConfigTag.HA;
 +
 +/**
 + * Class for SCM Ratis Server.
 + */
 +public final class SCMRatisServer {
 +  private static final Logger LOG = LoggerFactory
 +      .getLogger(SCMRatisServer.class);
 +
 +  private final StorageContainerManager scm;
 +  private final SCMStateMachine scmStateMachine;
 +
 +  private final String storageDir;
 +  private final int port;
 +  private final InetSocketAddress scmRatisAddress;
 +  private final RaftServer server;
 +  private final RaftGroupId raftGroupId;
 +  private final RaftGroup raftGroup;
 +  private final RaftPeerId raftPeerId;
 +
 +  private final ClientId clientId = ClientId.randomId();
 +  private final ScheduledExecutorService scheduledRoleChecker;
 +  private long roleCheckInitialDelayMs = 1000; // 1 second default
 +  private long roleCheckIntervalMs;
 +  private ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
 +  private Optional<RaftPeerRole> cachedPeerRole = Optional.empty();
 +  private Optional<RaftPeerId> cachedLeaderPeerId = Optional.empty();
 +
 +  private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
 +  private static long nextCallId() {
 +    return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
 +  }
 +
 +  /**
 +   * Creates a SCM Ratis Server.
 +   * @throws IOException
 +   */
 +  private SCMRatisServer(SCMRatisServerConfiguration conf,
 +                         StorageContainerManager scm, String ratisStorageDir,
 +                         String raftGroupIdStr, RaftPeerId localRaftPeerId,
 +                         InetSocketAddress addr, List<RaftPeer> raftPeers)
 +      throws IOException {
 +    this.scm = scm;
 +    this.scmRatisAddress = addr;
 +    this.port = addr.getPort();
 +    this.storageDir = ratisStorageDir;
 +    RaftProperties serverProperties = newRaftProperties(conf);
 +
 +    this.raftPeerId = localRaftPeerId;
 +    this.raftGroupId = RaftGroupId.valueOf(
 +        getRaftGroupIdFromOmServiceId(raftGroupIdStr));
 +    this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
 +
 +    StringBuilder raftPeersStr = new StringBuilder();
 +    for (RaftPeer peer : raftPeers) {
 +      raftPeersStr.append(", ").append(peer.getAddress());
 +    }
 +    LOG.info("Instantiating SCM Ratis server with GroupID: {} and " +
 +        "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2));
 +    this.scmStateMachine = getStateMachine();
 +
 +    this.server = RaftServer.newBuilder()
 +        .setServerId(this.raftPeerId)
 +        .setGroup(this.raftGroup)
 +        .setProperties(serverProperties)
 +        .setStateMachine(scmStateMachine)
 +        .build();
 +
 +    // Run a scheduler to check and update the server role on the leader
 +    // periodically
 +    this.scheduledRoleChecker = Executors.newSingleThreadScheduledExecutor();
 +    this.scheduledRoleChecker.scheduleWithFixedDelay(new Runnable() {
 +      @Override
 +      public void run() {
 +        // Run this check only on the leader OM
 +        if (cachedPeerRole.isPresent() &&
 +            cachedPeerRole.get() == RaftPeerRole.LEADER) {
 +          updateServerRole();
 +        }
 +      }
 +    }, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS);
 +  }
 +
 +  /**
 +   * Create a SCM Ratis Server instance.
 +   */
 +  public static SCMRatisServer newSCMRatisServer(
 +      SCMRatisServerConfiguration conf, StorageContainerManager scm,
 +      SCMNodeDetails scmNodeDetails, List<SCMNodeDetails> peers,
 +      String ratisStorageDir)
 +      throws IOException {
 +    String scmServiceId = scmNodeDetails.getSCMServiceId();
 +
 +    String scmNodeId = scmNodeDetails.getSCMNodeId();
 +    RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(scmNodeId);
 +    InetSocketAddress ratisAddr = new InetSocketAddress(
 +        scmNodeDetails.getAddress(), scmNodeDetails.getRatisPort());
 +
-     RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr);
++    RaftPeer localRaftPeer = RaftPeer.newBuilder()
++        .setId(localRaftPeerId)
++        .setAddress(ratisAddr)
++        .build();
 +
 +    List<RaftPeer> raftPeers = new ArrayList<>();
 +    raftPeers.add(localRaftPeer);
 +
 +    for (SCMNodeDetails peer : peers) {
 +      String peerNodeId = peer.getSCMNodeId();
 +      InetSocketAddress peerRatisAddr = new InetSocketAddress(
 +          peer.getAddress(), peer.getRatisPort());
 +      RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId);
-       RaftPeer raftPeer = new RaftPeer(raftPeerId, peerRatisAddr);
++      RaftPeer raftPeer = RaftPeer.newBuilder()
++          .setId(raftPeerId)
++          .setAddress(peerRatisAddr)
++          .build();
 +      // Add other SCMs in Ratis ring
 +      raftPeers.add(raftPeer);
 +    }
 +
 +    return new SCMRatisServer(conf, scm, ratisStorageDir, scmServiceId,
 +        localRaftPeerId, ratisAddr, raftPeers);
 +  }
 +
 +  private UUID getRaftGroupIdFromOmServiceId(String scmServiceId) {
 +    return UUID.nameUUIDFromBytes(scmServiceId.getBytes(
 +        StandardCharsets.UTF_8));
 +  }
 +
 +  private SCMStateMachine getStateMachine() {
 +    return new SCMStateMachine(this);
 +  }
 +
 +  private RaftProperties newRaftProperties(SCMRatisServerConfiguration conf) {
 +    final RaftProperties properties = new RaftProperties();
 +    // Set RPC type
 +    final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(conf.getRpcType());
 +    RaftConfigKeys.Rpc.setType(properties, rpc);
 +    // Set the ratis port number
 +    if (rpc == SupportedRpcType.GRPC) {
 +      GrpcConfigKeys.Server.setPort(properties, port);
 +    } else if (rpc == SupportedRpcType.NETTY) {
 +      NettyConfigKeys.Server.setPort(properties, port);
 +    }
 +    // Set Ratis storage directory
 +    RaftServerConfigKeys.setStorageDir(properties,
 +        Collections.singletonList(new File(storageDir)));
 +    // Set RAFT segment size
 +    RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
 +        SizeInBytes.valueOf((long)conf.getSegmentSize()));
 +    // Set RAFT segment pre-allocated size
 +    RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties,
 +        (int)conf.getLogAppenderQueueByteLimit());
 +    RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
 +        SizeInBytes.valueOf(conf.getLogAppenderQueueNum()));
 +    RaftServerConfigKeys.Log.setPreallocatedSize(properties,
 +        SizeInBytes.valueOf((int)conf.getPreallocatedSize()));
 +    RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
 +        false);
 +    RaftServerConfigKeys.Log.setPurgeGap(properties, conf.getLogPurgeGap());
 +    // For grpc set the maximum message size
 +    // TODO: calculate the optimal max message size
 +    GrpcConfigKeys.setMessageSizeMax(properties,
 +        SizeInBytes.valueOf((int)conf.getLogAppenderQueueByteLimit()));
 +
 +    // Set the server request timeout
 +    final TimeDuration serverRequestTimeout = TimeDuration.valueOf(
 +        conf.getRequestTimeout(), TimeUnit.MILLISECONDS);
 +    RaftServerConfigKeys.Rpc.setRequestTimeout(properties,
 +        serverRequestTimeout);
 +    // Set timeout for server retry cache entry
 +    final TimeDuration retryCacheTimeout = TimeDuration.valueOf(
 +        conf.getRetryCacheTimeout(), TimeUnit.MILLISECONDS);
 +    RaftServerConfigKeys.RetryCache.setExpiryTime(properties,
 +        retryCacheTimeout);
 +    // Set the server min and max timeout
 +    final TimeDuration serverMinTimeout = TimeDuration.valueOf(
 +        conf.getMinTimeout(), TimeUnit.MILLISECONDS);
 +    final TimeDuration serverMaxTimeout = TimeDuration.valueOf(
 +        conf.getMinTimeout() + 200L, TimeUnit.MILLISECONDS);
 +    RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
 +        serverMinTimeout);
 +    RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
 +        serverMaxTimeout);
 +    // Set the number of maximum cached segments
 +    RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);
 +    // TODO: set max write buffer size
 +    // Set the ratis leader election timeout
 +    final TimeDuration leaderElectionMinTimeout = TimeDuration.valueOf(
 +        conf.getMinLeaderElectionTimeout(), TimeUnit.MILLISECONDS);
 +    RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
 +        leaderElectionMinTimeout);
 +    long leaderElectionMaxTimeout = leaderElectionMinTimeout.toLong(
 +        TimeUnit.MILLISECONDS) + 200;
 +    RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
 +        TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
 +
 +    final TimeDuration nodeFailureTimeout = TimeDuration.valueOf(
 +        conf.getFailureTimeout(), TimeUnit.MILLISECONDS);
 +    RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties,
 +        nodeFailureTimeout);
 +    RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
 +        nodeFailureTimeout);
 +
 +    // Ratis leader role check
 +    this.roleCheckIntervalMs = conf.getRoleCheckerInterval();
 +    this.roleCheckInitialDelayMs = leaderElectionMinTimeout
 +        .toLong(TimeUnit.MILLISECONDS);
 +
 +    return properties;
 +  }
 +
 +  /**
 +   * Start the Ratis server.
 +   * @throws IOException
 +   */
 +  public void start() throws IOException {
 +    LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
 +        server.getId(), port);
 +    server.start();
 +  }
 +
 +  /**
 +   * Stop the Ratis server.
 +   */
 +  public void stop() {
 +    try {
 +      server.close();
 +      scmStateMachine.stop();
 +    } catch (IOException e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  private boolean checkCachedPeerRoleIsLeader() {
 +    this.roleCheckLock.readLock().lock();
 +    try {
 +      if (cachedPeerRole.isPresent() &&
 +          cachedPeerRole.get() ==RaftPeerRole.LEADER) {
 +        return true;
 +      }
 +      return false;
 +    } finally {
 +      this.roleCheckLock.readLock().unlock();
 +    }
 +  }
 +
 +  public boolean isLeader() {
 +    if (checkCachedPeerRoleIsLeader()) {
 +      return true;
 +    }
 +
 +    // Get the server role from ratis server and update the cached values.
 +    updateServerRole();
 +
 +    // After updating the server role, check and return if leader or not.
 +    return checkCachedPeerRoleIsLeader();
 +  }
 +
 +  @VisibleForTesting
 +  public LifeCycle.State getServerState() {
 +    return server.getLifeCycleState();
 +  }
 +
 +  @VisibleForTesting
 +  public RaftPeerId getRaftPeerId() {
 +    return this.raftPeerId;
 +  }
 +
 +  public RaftGroup getRaftGroup() {
 +    return this.raftGroup;
 +  }
 +
 +  /**
 +   * Get the local directory where ratis logs will be stored.
 +   */
 +  public static String getSCMRatisDirectory(ConfigurationSource conf) {
 +    String storageDir = conf.get(ScmConfigKeys.OZONE_SCM_RATIS_STORAGE_DIR);
 +
 +    if (Strings.isNullOrEmpty(storageDir)) {
 +      storageDir = ServerUtils.getDefaultRatisDirectory(conf);
 +    }
 +    return storageDir;
 +  }
 +
 +  public Optional<RaftPeerId> getCachedLeaderPeerId() {
 +    this.roleCheckLock.readLock().lock();
 +    try {
 +      return cachedLeaderPeerId;
 +    } finally {
 +      this.roleCheckLock.readLock().unlock();
 +    }
 +  }
 +
 +  public StorageContainerManager getSCM() {
 +    return scm;
 +  }
 +
 +  @VisibleForTesting
 +  public SCMStateMachine getScmStateMachine() {
 +    return scmStateMachine;
 +  }
 +
 +  public int getServerPort() {
 +    return port;
 +  }
 +
 +  public void updateServerRole() {
 +    try {
 +      GroupInfoReply groupInfo = getGroupInfo();
 +      RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
 +      RaftPeerRole thisNodeRole = roleInfoProto.getRole();
 +
 +      if (thisNodeRole.equals(RaftPeerRole.LEADER)) {
 +        setServerRole(thisNodeRole, raftPeerId);
 +
 +      } else if (thisNodeRole.equals(RaftPeerRole.FOLLOWER)) {
 +        ByteString leaderNodeId = roleInfoProto.getFollowerInfo()
 +            .getLeaderInfo().getId().getId();
 +        // There may be a chance, here we get leaderNodeId as null. For
 +        // example, in 3 node OM Ratis, if 2 OM nodes are down, there will
 +        // be no leader.
 +        RaftPeerId leaderPeerId = null;
 +        if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
 +          leaderPeerId = RaftPeerId.valueOf(leaderNodeId);
 +        }
 +
 +        setServerRole(thisNodeRole, leaderPeerId);
 +
 +      } else {
 +        setServerRole(thisNodeRole, null);
 +
 +      }
 +    } catch (IOException e) {
 +      LOG.error("Failed to retrieve RaftPeerRole. Setting cached role to " +
 +          "{} and resetting leader info.", RaftPeerRole.UNRECOGNIZED, e);
 +      setServerRole(null, null);
 +    }
 +  }
 +
 +  public TermIndex getLastAppliedTermIndex() {
 +    return scmStateMachine.getLastAppliedTermIndex();
 +  }
 +
 +  private GroupInfoReply getGroupInfo() throws IOException {
 +    GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId,
 +        raftPeerId, raftGroupId, nextCallId());
 +    GroupInfoReply groupInfo = server.getGroupInfo(groupInfoRequest);
 +    return groupInfo;
 +  }
 +
 +  private void setServerRole(RaftPeerRole currentRole,
 +                             RaftPeerId leaderPeerId) {
 +    this.roleCheckLock.writeLock().lock();
 +    try {
 +      this.cachedPeerRole = Optional.ofNullable(currentRole);
 +      this.cachedLeaderPeerId = Optional.ofNullable(leaderPeerId);
 +    } finally {
 +      this.roleCheckLock.writeLock().unlock();
 +    }
 +  }
 +
 +  /**
 +   * Configuration used by SCM Ratis Server.
 +   */
 +  @ConfigGroup(prefix = "ozone.scm.ratis")
 +  public static class SCMRatisServerConfiguration {
 +    @Config(key = "rpc.type",
 +        type = ConfigType.STRING,
 +        defaultValue = "GRPC",
 +        tags = {SCM, OZONE, HA, RATIS},
 +        description = "Ratis supports different kinds of transports like" +
 +            " netty, GRPC, Hadoop RPC etc. This picks one of those for" +
 +            " this cluster."
 +    )
 +    private String rpcType;
 +
 +    @Config(key = "segment.size",
 +        type = ConfigType.SIZE,
 +        defaultValue = "16KB",
 +        tags = {SCM, OZONE, HA, RATIS},
 +        description = "The size of the raft segment used by Apache Ratis on" +
 +            " SCM. (16 KB by default)"
 +    )
 +    private double segmentSize = 16 * 1024;
 +
 +    @Config(key = "segment.preallocated.size",
 +        type = ConfigType.SIZE,
 +        defaultValue = "16KB",
 +        tags = {SCM, OZONE, HA, RATIS},
 +        description = "The size of the buffer which is preallocated for" +
 +            " raft segment used by Apache Ratis on SCM.(16 KB by default)"
 +    )
 +    private double preallocatedSize = 16 * 1024;
 +
 +    @Config(key = "log.appender.queue.num-elements",
 +        type = ConfigType.INT,
 +        defaultValue = "1024",
 +        tags = {SCM, OZONE, HA, RATIS},
 +        description = "Number of operation pending with Raft's Log Worker."
 +    )
 +    private int logAppenderQueueNum = 1024;
 +
 +    @Config(key = "log.appender.queue.byte-limit",
 +        type = ConfigType.SIZE,
 +        defaultValue = "32MB",
 +        tags = {SCM, OZONE, HA, RATIS},
 +        description = "Byte limit for Raft's Log Worker queue."
 +    )
 +    private double logAppenderQueueByteLimit = 32 * 1024 * 1024;
 +
 +    @Config(key = "log.purge.gap",
 +        type = ConfigType.INT,
 +        defaultValue = "1000000",
 +        tags = {SCM, OZONE, HA, RATIS},
 +        description = "The minimum gap between log indices for Raft server to" +
 +            " purge its log segments after taking snapshot."
 +    )
 +    private int logPurgeGap = 1000000;
 +
 +    @Config(key = "server.request.timeout",
 +        type = ConfigType.TIME,
 +        defaultValue = "3s",
 +        tags = {SCM, OZONE, HA, RATIS},
 +        description = "The timeout duration for SCM's ratis server request."
 +    )
 +    private long requestTimeout = 3 * 1000L;
 +
 +    @Config(key = "server.retry.cache.timeout",
 +        type = ConfigType.TIME,
 +        defaultValue = "60s",
 +        tags = {SCM, OZONE, HA, RATIS},
 +        description = "Retry Cache entry timeout for SCM's ratis server."
 +    )
 +    private long retryCacheTimeout = 60 * 1000L;
 +
 +    @Config(key = "minimum.timeout",
 +        type = ConfigType.TIME,
 +        defaultValue = "1s",
 +        tags = {SCM, OZONE, HA, RATIS},
 +        description = "The minimum timeout duration for SCM's Ratis server rpc."
 +    )
 +    private long minTimeout = 1 * 1000L;
 +
 +    @Config(key = "leader.election.minimum.timeout.duration",
 +        type = ConfigType.TIME,
 +        defaultValue = "1s",
 +        tags = {SCM, OZONE, HA, RATIS},
 +        description = "The minimum timeout duration for SCM ratis leader" +
 +            " election. Default is 1s."
 +    )
 +    private long minLeaderElectionTimeout = 1 * 1000L;
 +
 +    @Config(key = "server.failure.timeout.duration",
 +        type = ConfigType.TIME,
 +        defaultValue = "120s",
 +        tags = {SCM, OZONE, HA, RATIS},
 +        description = "The timeout duration for ratis server failure" +
 +            " detection, once the threshold has reached, the ratis state" +
 +            " machine will be informed about the failure in the ratis ring."
 +    )
 +    private long failureTimeout = 120 * 1000L;
 +
 +    @Config(key = "server.role.check.interval",
 +        type = ConfigType.TIME,
 +        defaultValue = "15s",
 +        tags = {SCM, OZONE, HA, RATIS},
 +        description = "The interval between SCM leader performing a role" +
 +            " check on its ratis server. Ratis server informs SCM if it loses" +
 +            " the leader role. The scheduled check is an secondary check to" +
 +            " ensure that the leader role is updated periodically"
 +    )
 +    private long roleCheckerInterval = 15 * 1000L;
 +
 +    public String getRpcType() {
 +      return rpcType;
 +    }
 +
 +    public double getSegmentSize() {
 +      return segmentSize;
 +    }
 +
 +    public double getPreallocatedSize() {
 +      return preallocatedSize;
 +    }
 +
 +    public int getLogAppenderQueueNum() {
 +      return logAppenderQueueNum;
 +    }
 +
 +    public double getLogAppenderQueueByteLimit() {
 +      return logAppenderQueueByteLimit;
 +    }
 +
 +    public int getLogPurgeGap() {
 +      return logPurgeGap;
 +    }
 +
 +    public long getRequestTimeout() {
 +      return requestTimeout;
 +    }
 +
 +    public long getRetryCacheTimeout() {
 +      return retryCacheTimeout;
 +    }
 +
 +    public long getMinTimeout() {
 +      return minTimeout;
 +    }
 +
 +    public long getMinLeaderElectionTimeout() {
 +      return minLeaderElectionTimeout;
 +    }
 +
 +    public long getFailureTimeout() {
 +      return failureTimeout;
 +    }
 +
 +
 +    public long getRoleCheckerInterval() {
 +      return roleCheckerInterval;
 +    }
 +  }
 +}
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java
index 9a725a6,0000000..1bc71e4
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java
@@@ -1,242 -1,0 +1,241 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + *  with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *  Unless required by applicable law or agreed to in writing, software
 + *  distributed under the License is distributed on an "AS IS" BASIS,
 + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  See the License for the specific language governing permissions and
 + *  limitations under the License.
 + */
 +
 +package org.apache.hadoop.hdds.scm.server.ratis;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import com.google.common.util.concurrent.ThreadFactoryBuilder;
 +import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 +import org.apache.hadoop.util.concurrent.HadoopExecutors;
 +import org.apache.ratis.proto.RaftProtos;
 +import org.apache.ratis.protocol.Message;
 +import org.apache.ratis.protocol.RaftClientRequest;
 +import org.apache.ratis.protocol.RaftGroupId;
 +import org.apache.ratis.server.RaftServer;
 +import org.apache.ratis.server.protocol.TermIndex;
 +import org.apache.ratis.server.storage.RaftStorage;
 +import org.apache.ratis.statemachine.SnapshotInfo;
 +import org.apache.ratis.statemachine.TransactionContext;
 +import org.apache.ratis.statemachine.impl.BaseStateMachine;
 +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
 +import org.apache.ratis.util.LifeCycle;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.io.IOException;
 +import java.util.Collection;
 +import java.util.concurrent.CompletableFuture;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.ConcurrentSkipListMap;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.ThreadFactory;
 +import java.util.concurrent.TimeUnit;
 +
 +/**
 + * Class for SCM StateMachine.
 + */
 +public class SCMStateMachine extends BaseStateMachine {
 +  static final Logger LOG =
 +      LoggerFactory.getLogger(SCMStateMachine.class);
 +  private final SimpleStateMachineStorage storage =
 +      new SimpleStateMachineStorage();
 +  private final SCMRatisServer scmRatisServer;
 +  private final StorageContainerManager scm;
 +  private RaftGroupId raftGroupId;
 +  private final SCMRatisSnapshotInfo snapshotInfo;
 +  private final ExecutorService executorService;
 +  private final ExecutorService installSnapshotExecutor;
 +
 +  // Map which contains index and term for the ratis transactions which are
 +  // stateMachine entries which are recived through applyTransaction.
 +  private ConcurrentMap<Long, Long> applyTransactionMap =
 +      new ConcurrentSkipListMap<>();
 +
 +  /**
 +   * Create a SCM state machine.
 +   */
 +  public SCMStateMachine(SCMRatisServer ratisServer) {
 +    this.scmRatisServer = ratisServer;
 +    this.scm = ratisServer.getSCM();
 +
 +    // TODO: remove the whole file later
 +    this.snapshotInfo = null;
 +    updateLastAppliedIndexWithSnaphsotIndex();
 +
 +    ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
 +        .setNameFormat("SCM StateMachine ApplyTransaction Thread - %d").build();
 +    this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
 +    this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor();
 +  }
 +
 +  /**
 +   * Initializes the State Machine with the given server, group and storage.
 +   */
 +  @Override
 +  public void initialize(RaftServer server, RaftGroupId id,
 +                         RaftStorage raftStorage) throws IOException {
 +    getLifeCycle().startAndTransition(() -> {
 +      super.initialize(server, id, raftStorage);
 +      this.raftGroupId = id;
 +      storage.init(raftStorage);
 +    });
 +  }
 +
 +  /**
 +   * Pre-execute the update request into state machine.
 +   */
 +  @Override
 +  public TransactionContext startTransaction(
 +      RaftClientRequest raftClientRequest) {
 +    return TransactionContext.newBuilder()
 +        .setClientRequest(raftClientRequest)
 +        .setStateMachine(this)
 +        .setServerRole(RaftProtos.RaftPeerRole.LEADER)
 +        .setLogData(raftClientRequest.getMessage().getContent())
 +        .build();
 +  }
 +
 +  /**
 +   * Apply a committed log entry to state machine.
 +   */
 +  @Override
 +  public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
 +    CompletableFuture<Message> ratisFuture =
 +        new CompletableFuture<>();
 +    //TODO execute SCMRequest and process SCMResponse
 +    return ratisFuture;
 +  }
 +
 +  /**
 +   * Query state machine.
 +   */
 +  @Override
 +  public CompletableFuture<Message> query(Message request) {
 +    //TODO make handler respond to the query request.
 +    return CompletableFuture.completedFuture(request);
 +  }
 +
 +  /**
 +   * Pause state machine.
 +   */
 +  @Override
 +  public void pause() {
 +    getLifeCycle().transition(LifeCycle.State.PAUSING);
 +    getLifeCycle().transition(LifeCycle.State.PAUSED);
 +  }
 +
 +  /**
 +   * Unpause state machine and update the lastAppliedIndex.
 +   * Following after uploading new state to state machine.
 +   */
 +  public void unpause(long newLastAppliedSnaphsotIndex,
 +                      long newLastAppliedSnapShotTermIndex) {
 +    getLifeCycle().startAndTransition(() -> {
 +      this.setLastAppliedTermIndex(TermIndex.newTermIndex(
 +          newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex));
 +    });
 +  }
 +
 +  /**
 +   * Take SCM snapshot and write index to file.
 +   * @return actual index or 0 if error.
 +   */
 +  @Override
 +  public long takeSnapshot() throws IOException {
 +    LOG.info("Saving Ratis snapshot on the SCM.");
 +    if (scm != null) {
 +      // TODO: remove the whole file later
 +      return 0;
 +    }
 +    return 0;
 +  }
 +
 +  /**
 +   * Get latest SCM snapshot.
 +   */
 +  @Override
 +  public SnapshotInfo getLatestSnapshot() {
 +    return snapshotInfo;
 +  }
 +
 +  private synchronized void updateLastApplied() {
 +    Long appliedTerm = null;
 +    long appliedIndex = -1;
 +    for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {
 +      final Long removed = applyTransactionMap.remove(i);
 +      if (removed == null) {
 +        break;
 +      }
 +      appliedTerm = removed;
 +      appliedIndex = i;
 +    }
 +    if (appliedTerm != null) {
 +      updateLastAppliedTermIndex(appliedTerm, appliedIndex);
 +    }
 +  }
 +
 +  /**
 +   * Called to notify state machine about indexes which are processed
 +   * internally by Raft Server, this currently happens when conf entries are
 +   * processed in raft Server. This keep state machine to keep a track of index
 +   * updates.
 +   */
-   @Override
 +  public void notifyIndexUpdate(long currentTerm, long index) {
 +    applyTransactionMap.put(index, currentTerm);
 +    updateLastApplied();
 +    snapshotInfo.updateTerm(currentTerm);
 +  }
 +
 +  /**
 +   * Notifies the state machine that the raft peer is no longer leader.
 +   */
 +  @Override
 +  public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
 +    scmRatisServer.updateServerRole();
 +  }
 +
 +  /**
 +   * Transfer from log entry to string.
 +   */
 +  @Override
 +  public String toStateMachineLogEntryString(
 +      RaftProtos.StateMachineLogEntryProto proto) {
 +    //TODO implement transfer from proto to SCMRequest body.
 +    return null;
 +  }
 +
 +  /**
 +   * Update lastAppliedIndex term in snapshot info.
 +   */
 +  public void updateLastAppliedIndexWithSnaphsotIndex() {
 +    setLastAppliedTermIndex(TermIndex.newTermIndex(snapshotInfo.getTerm(),
 +        snapshotInfo.getIndex()));
 +    LOG.info("LastAppliedIndex set from SnapShotInfo {}",
 +        getLastAppliedTermIndex());
 +  }
 +
 +  @VisibleForTesting
 +  void addApplyTransactionTermIndex(long term, long index) {
 +    applyTransactionMap.put(index, term);
 +  }
 +
 +  public void stop() {
 +    HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
 +    HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
 +  }
 +}
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index 886eaee,0000000..ac58438
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@@ -1,208 -1,0 +1,224 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership.  The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * <p/>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p/>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +
 +package org.apache.hadoop.hdds.scm.ha;
 +
 +import java.io.IOException;
 +import java.lang.reflect.InvocationTargetException;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.EnumMap;
 +import java.util.List;
 +import java.util.Map;
 +
 +import com.google.protobuf.InvalidProtocolBufferException;
 +import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
 +import org.apache.ratis.protocol.ClientId;
 +import org.apache.ratis.protocol.Message;
 +import org.apache.ratis.protocol.RaftClientReply;
 +import org.apache.ratis.protocol.RaftGroupId;
 +import org.apache.ratis.protocol.RaftGroupMemberId;
 +import org.apache.ratis.protocol.RaftPeer;
 +import org.apache.ratis.protocol.RaftPeerId;
 +import org.apache.ratis.protocol.exceptions.NotLeaderException;
 +import org.apache.ratis.server.RaftServer;
 +import org.apache.ratis.protocol.exceptions.StateMachineException;
 +
 +/**
 + * Mock SCMHAManager implementation for testing.
 + */
 +public final class MockSCMHAManager implements SCMHAManager {
 +
 +  private final SCMRatisServer ratisServer;
 +  private boolean isLeader;
 +
 +  public static SCMHAManager getInstance() {
 +    return new MockSCMHAManager();
 +  }
 +
 +  public static SCMHAManager getLeaderInstance() {
 +    MockSCMHAManager mockSCMHAManager = new MockSCMHAManager();
 +    mockSCMHAManager.setIsLeader(true);
 +    return mockSCMHAManager;
 +  }
 +
 +  public static SCMHAManager getFollowerInstance() {
 +    MockSCMHAManager mockSCMHAManager = new MockSCMHAManager();
 +    mockSCMHAManager.setIsLeader(false);
 +    return mockSCMHAManager;
 +  }
 +
 +  /**
 +   * Creates MockSCMHAManager instance.
 +   */
 +  private MockSCMHAManager() {
 +    this.ratisServer = new MockRatisServer();
 +    this.isLeader = true;
 +  }
 +
 +  @Override
 +  public void start() throws IOException {
 +    ratisServer.start();
 +  }
 +
 +  /**
 +   * {@inheritDoc}
 +   */
 +  @Override
 +  public boolean isLeader() {
 +    return isLeader;
 +  }
 +
 +  public void setIsLeader(boolean isLeader) {
 +    this.isLeader = isLeader;
 +  }
 +
 +  @Override
 +  public RaftPeer getSuggestedLeader() {
 +    throw new UnsupportedOperationException();
 +  }
 +
 +  /**
 +   * {@inheritDoc}
 +   */
 +  @Override
 +  public SCMRatisServer getRatisServer() {
 +    return ratisServer;
 +  }
 +
 +  /**
 +   * {@inheritDoc}
 +   */
 +  @Override
 +  public void shutdown() throws IOException {
 +    ratisServer.stop();
 +  }
 +
 +  @Override
 +  public List<String> getRatisRoles() {
 +    return Arrays.asList(
 +        "180.3.14.5:9865",
 +        "180.3.14.21:9865",
 +        "180.3.14.145:9865");
 +  }
 +
 +  /**
 +   * {@inheritDoc}
 +   */
 +  @Override
 +  public NotLeaderException triggerNotLeaderException() {
 +    return new NotLeaderException(RaftGroupMemberId.valueOf(
 +        RaftPeerId.valueOf("peer"), RaftGroupId.randomId()),
 +        null, new ArrayList<>());
 +  }
 +
 +  private static class MockRatisServer implements SCMRatisServer {
 +
 +    private Map<RequestType, Object> handlers =
 +        new EnumMap<>(RequestType.class);
 +
 +    @Override
 +    public void start() {
 +    }
 +
 +    @Override
 +    public void registerStateMachineHandler(final RequestType handlerType,
 +                                            final Object handler) {
 +      handlers.put(handlerType, handler);
 +    }
 +
 +    @Override
 +    public SCMRatisResponse submitRequest(final SCMRatisRequest request)
 +        throws IOException {
 +      final RaftGroupMemberId raftId = RaftGroupMemberId.valueOf(
 +          RaftPeerId.valueOf("peer"), RaftGroupId.randomId());
 +      RaftClientReply reply;
 +      try {
 +        final Message result = process(request);
-         return SCMRatisResponse.decode(new RaftClientReply(ClientId.randomId(),
-             raftId, 1L, true, result, null, 1L, null));
++        reply = RaftClientReply.newBuilder()
++            .setClientId(ClientId.randomId())
++            .setServerId(raftId)
++            .setGroupId(RaftGroupId.emptyGroupId())
++            .setCallId(1L)
++            .setSuccess(true)
++            .setMessage(result)
++            .setException(null)
++            .setLogIndex(1L)
++            .build();
 +      } catch (Exception ex) {
-         return SCMRatisResponse.decode(new RaftClientReply(ClientId.randomId(),
-             raftId, 1L, false, null,
-             new StateMachineException(raftId, ex), 1L, null));
++        reply = RaftClientReply.newBuilder()
++            .setClientId(ClientId.randomId())
++            .setServerId(raftId)
++            .setGroupId(RaftGroupId.emptyGroupId())
++            .setCallId(1L)
++            .setSuccess(false)
++            .setMessage(Message.EMPTY)
++            .setException(new StateMachineException(raftId, ex))
++            .setLogIndex(1L)
++            .build();
 +      }
++      return SCMRatisResponse.decode(reply);
 +    }
 +
 +    private Message process(final SCMRatisRequest request)
 +        throws Exception {
 +      try {
 +        final Object handler = handlers.get(request.getType());
 +
 +        if (handler == null) {
 +          throw new IOException("No handler found for request type " +
 +              request.getType());
 +        }
 +
 +        final List<Class<?>> argumentTypes = new ArrayList<>();
 +        for(Object args : request.getArguments()) {
 +          argumentTypes.add(args.getClass());
 +        }
 +        final Object result = handler.getClass().getMethod(
 +            request.getOperation(), argumentTypes.toArray(new Class<?>[0]))
 +            .invoke(handler, request.getArguments());
 +
 +        return SCMRatisResponse.encode(result);
 +      } catch (NoSuchMethodException | SecurityException ex) {
 +        throw new InvalidProtocolBufferException(ex.getMessage());
 +      } catch (InvocationTargetException e) {
 +        final Exception targetEx = (Exception) e.getTargetException();
 +        throw targetEx != null ? targetEx : e;
 +      }
 +    }
 +
 +    @Override
 +    public RaftServer getServer() {
 +      return null;
 +    }
 +
 +    @Override
 +    public RaftGroupId getRaftGroupId() {
 +      return null;
 +    }
 +
 +    @Override
 +    public List<RaftPeer> getRaftPeers() {
 +      return new ArrayList<>();
 +    }
 +
 +    @Override
 +    public void stop() {
 +    }
 +  }
 +
 +}
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMRatisResponse.java
index 05e2970,0000000..7ecbf2a
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMRatisResponse.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMRatisResponse.java
@@@ -1,73 -1,0 +1,89 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.hadoop.hdds.scm.ha;
 +
 +import com.google.protobuf.InvalidProtocolBufferException;
 +import org.apache.ratis.protocol.ClientId;
 +import org.apache.ratis.protocol.Message;
 +import org.apache.ratis.protocol.RaftClientReply;
 +import org.apache.ratis.protocol.RaftGroupId;
 +import org.apache.ratis.protocol.RaftGroupMemberId;
 +import org.apache.ratis.protocol.RaftPeerId;
 +import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
 +import org.apache.ratis.protocol.exceptions.RaftException;
 +import org.junit.Assert;
 +import org.junit.Before;
 +import org.junit.Test;
 +
 +/**
 + * Test for SCMRatisResponse.
 + */
 +public class TestSCMRatisResponse {
 +  private RaftGroupMemberId raftId;
 +
 +  @Before
 +  public void init() {
 +    raftId = RaftGroupMemberId.valueOf(
 +        RaftPeerId.valueOf("peer"), RaftGroupId.randomId());
 +  }
 +
 +  @Test
 +  public void testEncodeAndDecodeSuccess() throws Exception {
-     SCMRatisResponse response = SCMRatisResponse.decode(new RaftClientReply(
-         ClientId.randomId(), raftId, 1L, true, Message.EMPTY,
-         null, 1L, null));
++    RaftClientReply reply = RaftClientReply.newBuilder()
++        .setClientId(ClientId.randomId())
++        .setServerId(raftId)
++        .setGroupId(RaftGroupId.emptyGroupId())
++        .setCallId(1L)
++        .setSuccess(true)
++        .setMessage(Message.EMPTY)
++        .setException(null)
++        .setLogIndex(1L)
++        .build();
++    SCMRatisResponse response = SCMRatisResponse.decode(reply);
 +    Assert.assertTrue(response.isSuccess());
 +    Assert.assertEquals(Message.EMPTY,
 +        SCMRatisResponse.encode(response.getResult()));
 +  }
 +
 +  @Test
 +  public void testDecodeOperationFailureWithException() throws Exception {
-     SCMRatisResponse response = SCMRatisResponse.decode(new RaftClientReply(
-         ClientId.randomId(), raftId, 1L, false, Message.EMPTY,
-         new LeaderNotReadyException(raftId), 1L, null));
++    RaftClientReply reply = RaftClientReply.newBuilder()
++        .setClientId(ClientId.randomId())
++        .setServerId(raftId)
++        .setGroupId(RaftGroupId.emptyGroupId())
++        .setCallId(1L)
++        .setSuccess(false)
++        .setMessage(Message.EMPTY)
++        .setException(new LeaderNotReadyException(raftId))
++        .setLogIndex(1L)
++        .build();
++    SCMRatisResponse response = SCMRatisResponse.decode(reply);
 +    Assert.assertFalse(response.isSuccess());
 +    Assert.assertTrue(response.getException() instanceof RaftException);
 +    Assert.assertNull(response.getResult());
 +  }
 +
 +  @Test(expected =  InvalidProtocolBufferException.class)
 +  public void testEncodeFailureWithNonProto() throws Exception {
 +    // Non proto input
 +    Message message = Message.valueOf("test");
 +    // Should fail with exception.
 +    SCMRatisResponse.encode(message);
 +  }
 +}
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index f35318d,e770ba9..8426084
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@@ -145,17 -136,8 +147,18 @@@ public class TestHealthyPipelineSafeMod
        Pipeline pipeline3 =
            pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
                HddsProtos.ReplicationFactor.THREE);
+       pipelineManager.openPipeline(pipeline3.getId());
  
 +      // Mark pipeline healthy
 +      pipeline1 = pipelineManager.getPipeline(pipeline1.getId());
 +      MockRatisPipelineProvider.markPipelineHealthy(pipeline1);
 +
 +      pipeline2 = pipelineManager.getPipeline(pipeline2.getId());
 +      MockRatisPipelineProvider.markPipelineHealthy(pipeline2);
 +
 +      pipeline3 = pipelineManager.getPipeline(pipeline3.getId());
 +      MockRatisPipelineProvider.markPipelineHealthy(pipeline3);
 +
        SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
            config, containers, pipelineManager, eventQueue);
  
@@@ -236,16 -215,8 +241,17 @@@
        Pipeline pipeline3 =
            pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
                HddsProtos.ReplicationFactor.THREE);
+       pipelineManager.openPipeline(pipeline3.getId());
  
 +      // Mark pipeline healthy
 +      pipeline1 = pipelineManager.getPipeline(pipeline1.getId());
 +      MockRatisPipelineProvider.markPipelineHealthy(pipeline1);
 +
 +      pipeline2 = pipelineManager.getPipeline(pipeline2.getId());
 +      MockRatisPipelineProvider.markPipelineHealthy(pipeline2);
 +
 +      pipeline3 = pipelineManager.getPipeline(pipeline3.getId());
 +      MockRatisPipelineProvider.markPipelineHealthy(pipeline3);
  
        SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
            config, containers, pipelineManager, eventQueue);
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index 5aa67a3,6430247..6860da2
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@@ -27,13 -32,10 +32,16 @@@ import org.apache.hadoop.hdds.scm.HddsT
  import org.apache.hadoop.hdds.scm.container.ContainerInfo;
  import org.apache.hadoop.hdds.scm.container.MockNodeManager;
  import org.apache.hadoop.hdds.scm.events.SCMEvents;
 +import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
  import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
  import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
 -import org.apache.hadoop.hdds.scm.pipeline.*;
 +import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 +import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
++import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
++import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
+ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
  import org.apache.hadoop.hdds.server.events.EventQueue;
  import org.apache.hadoop.test.GenericTestUtils;
  
@@@ -51,8 -53,9 +59,9 @@@ public class TestOneReplicaPipelineSafe
    @Rule
    public TemporaryFolder folder = new TemporaryFolder();
    private OneReplicaPipelineSafeModeRule rule;
 -  private SCMPipelineManager pipelineManager;
 +  private PipelineManagerV2Impl pipelineManager;
    private EventQueue eventQueue;
+   private MockNodeManager mockNodeManager;
  
    private void setup(int nodes, int pipelineFactorThreeCount,
        int pipelineFactorOneCount) throws Exception {
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 7901883,7831307..f081bac
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@@ -35,15 -37,11 +37,16 @@@ import org.apache.hadoop.hdds.scm.HddsT
  import org.apache.hadoop.hdds.scm.container.ContainerInfo;
  import org.apache.hadoop.hdds.scm.container.MockNodeManager;
  import org.apache.hadoop.hdds.scm.events.SCMEvents;
 +import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
  import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
  import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
 -import org.apache.hadoop.hdds.scm.pipeline.*;
 +import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 +import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 +import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
  import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
+ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
  import org.apache.hadoop.hdds.server.events.EventHandler;
  import org.apache.hadoop.hdds.server.events.EventPublisher;
  import org.apache.hadoop.hdds.server.events.EventQueue;
@@@ -385,15 -362,11 +388,16 @@@ public class TestSCMSafeModeManager 
          mockRatisProvider);
      pipelineManager.allowPipelineCreation();
  
 -    for (int i=0; i < pipelineCount; i++) {
 -      Pipeline pipeline = pipelineManager.
 -              createPipeline(HddsProtos.ReplicationType.RATIS,
 +    for (int i = 0; i < pipelineCount; i++) {
 +      // Create pipeline
 +      Pipeline pipeline = pipelineManager.createPipeline(
 +          HddsProtos.ReplicationType.RATIS,
            HddsProtos.ReplicationFactor.THREE);
 +
+       pipelineManager.openPipeline(pipeline.getId());
 +      // Mark pipeline healthy
 +      pipeline = pipelineManager.getPipeline(pipeline.getId());
 +      MockRatisPipelineProvider.markPipelineHealthy(pipeline);
      }
  
      for (ContainerInfo container : containers) {
diff --cc hadoop-ozone/fault-injection-test/mini-chaos-tests/pom.xml
index 5523150,5523150..66a44a3
--- a/hadoop-ozone/fault-injection-test/mini-chaos-tests/pom.xml
+++ b/hadoop-ozone/fault-injection-test/mini-chaos-tests/pom.xml
@@@ -50,6 -50,6 +50,12 @@@ http://maven.apache.org/xsd/maven-4.0.0
      </dependency>
      <dependency>
        <groupId>org.apache.hadoop</groupId>
++      <artifactId>hadoop-hdds-server-scm</artifactId>
++      <scope>test</scope>
++      <type>test-jar</type>
++    </dependency>
++    <dependency>
++      <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdds-hadoop-dependency-test</artifactId>
        <scope>test</scope>
      </dependency>
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 394e43c,629ab5a..099b069
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@@ -54,8 -40,8 +40,9 @@@ import org.apache.hadoop.hdds.annotatio
  import org.apache.hadoop.hdds.conf.OzoneConfiguration;
  import org.apache.hadoop.hdds.protocol.DatanodeDetails;
  import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+ import org.apache.hadoop.hdds.scm.OzoneClientConfig;
  import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 +import org.apache.hadoop.hdds.scm.TestUtils;
  import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
  import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
  import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index ce696c5,135fb51..c8c05df
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@@ -17,37 -17,8 +17,40 @@@
   */
  package org.apache.hadoop.ozone;
  
 +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
 +    .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
 +import static org.apache.hadoop.hdds.HddsConfigKeys
 +    .HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 +import static org.apache.hadoop.hdds.HddsConfigKeys
 +    .HDDS_CONTAINER_REPORT_INTERVAL;
 +import static org.apache.hadoop.hdds.HddsConfigKeys
 +    .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
 +import static org.junit.Assert.fail;
 +
 +import org.apache.hadoop.hdds.scm.TestUtils;
 +import static org.mockito.Matchers.argThat;
 +import static org.mockito.Matchers.eq;
 +import static org.mockito.Mockito.mock;
 +import static org.mockito.Mockito.verify;
 +import static org.mockito.Mockito.when;
 +
 +import java.io.File;
 +import java.io.IOException;
 +import java.lang.reflect.Field;
 +import java.lang.reflect.Modifier;
 +import java.nio.file.Path;
 +import java.nio.file.Paths;
++import java.time.Duration;
 +import java.util.Collections;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.TimeUnit;
 +
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
  import org.apache.commons.io.FileUtils;
  import org.apache.commons.lang3.RandomUtils;
  import org.apache.hadoop.hdds.HddsConfigKeys;
@@@ -104,8 -76,27 +108,6 @@@ import org.mockito.Mockito
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
- import com.google.common.collect.Lists;
- import com.google.common.collect.Maps;
 -import java.io.File;
 -import java.io.IOException;
 -import java.lang.reflect.Field;
 -import java.lang.reflect.Modifier;
 -import java.nio.file.Path;
 -import java.nio.file.Paths;
 -import java.time.Duration;
 -import java.util.Map;
 -import java.util.List;
 -import java.util.Set;
 -import java.util.Collections;
 -import java.util.HashSet;
 -import java.util.UUID;
 -import java.util.concurrent.TimeUnit;
 -
 -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
 -import static org.apache.hadoop.hdds.HddsConfigKeys.*;
 -import static org.junit.Assert.fail;
 -import static org.mockito.Matchers.argThat;
 -import static org.mockito.Matchers.eq;
 -import static org.mockito.Mockito.*;
  
  /**
   * Test class that exercises the StorageContainerManager.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org