You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2020/11/30 13:52:00 UTC
[ozone] 01/01: Merge branch 'master' into HDDS-2823
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit f30bc4ee66681fe9b554cd322be0cedf29d8cde7
Merge: 44a6503 130ba4d
Author: Nandakumar <na...@apache.org>
AuthorDate: Mon Nov 30 16:55:07 2020 +0530
Merge branch 'master' into HDDS-2823
.asf.yaml | 2 +-
README.md | 22 +-
hadoop-hdds/client/pom.xml | 5 -
.../apache/hadoop/hdds/scm/OzoneClientConfig.java | 212 +++++++++
.../apache/hadoop/hdds/scm/XceiverClientRatis.java | 12 +-
.../hadoop/hdds/scm/storage/BlockOutputStream.java | 66 ++-
.../hadoop/hdds/scm/storage/CommitWatcher.java | 2 +-
.../storage/TestBlockOutputStreamCorrectness.java | 17 +-
.../hdds/scm/storage/TestChunkInputStream.java | 11 +-
.../hadoop/hdds/protocol/DatanodeDetails.java | 5 +-
.../org/apache/hadoop/hdds/ratis/RatisHelper.java | 12 +-
.../java/org/apache/hadoop/hdds/scm/ScmConfig.java | 43 ++
.../hdds/scm/storage/ContainerProtocolCalls.java | 4 +-
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 73 +--
.../org/apache/hadoop/ozone/common/Checksum.java | 22 +-
.../common/src/main/resources/ozone-default.xml | 91 +---
.../ratis/TestContainerCommandRequestMessage.java | 13 +-
.../hadoop/ozone/audit/TestOzoneAuditLogger.java | 8 +-
.../common/src/test/resources/auditlog.properties | 2 +-
.../org/apache/hadoop/hdds/conf/ConfigTag.java | 1 +
.../hdds/conf/ConfigurationReflectionUtil.java | 14 +-
.../hadoop/hdds/conf/ConfigurationTarget.java | 4 +
hadoop-hdds/container-service/pom.xml | 11 +-
.../common/statemachine/DatanodeConfiguration.java | 24 +
.../common/statemachine/DatanodeStateMachine.java | 6 +
.../common/statemachine/EndpointStateMachine.java | 16 +
.../ClosePipelineCommandHandler.java | 15 +-
.../CreatePipelineCommandHandler.java | 30 +-
.../states/datanode/RunningDatanodeState.java | 8 +-
.../server/ratis/ContainerStateMachine.java | 4 +-
.../transport/server/ratis/XceiverServerRatis.java | 37 +-
.../container/common/utils/ContainerCache.java | 52 ++-
.../common/utils/ContainerCacheMetrics.java | 114 +++++
.../ozone/container/common/volume/HddsVolume.java | 12 +-
.../container/common/volume/VolumeIOStats.java | 72 ++-
.../container/keyvalue/KeyValueContainer.java | 3 +
.../container/keyvalue/helpers/BlockUtils.java | 54 +++
.../keyvalue/helpers/KeyValueContainerUtil.java | 112 +++--
.../background/BlockDeletingService.java | 19 +-
.../container/metadata/AbstractDatanodeStore.java | 69 ++-
.../metadata/DatanodeStoreSchemaOneImpl.java | 7 +-
.../metadata/DatanodeStoreSchemaTwoImpl.java | 7 +-
.../ozone/container/ozoneimpl/OzoneContainer.java | 12 +-
.../replication/DownloadAndImportReplicator.java | 12 +-
.../container/replication/GrpcOutputStream.java | 2 +-
.../replication/GrpcReplicationClient.java | 23 +-
.../replication/SimpleContainerDownloader.java | 36 +-
.../protocol/commands/CreatePipelineCommand.java | 16 +
.../main/resources/webapps/hddsDatanode/index.html | 2 +-
.../ozone/container/ContainerTestHelper.java | 6 +-
.../container/common/TestBlockDeletingService.java | 319 +++++++------
.../ozone/container/common/TestContainerCache.java | 10 +-
.../TestSchemaOneBackwardsCompatibility.java | 83 ++--
.../common/impl/TestContainerPersistence.java | 53 +--
.../TestCreatePipelineCommandHandler.java | 13 +-
.../container/keyvalue/TestKeyValueContainer.java | 53 ++-
.../container/ozoneimpl/TestContainerReader.java | 13 +-
.../container/ozoneimpl/TestOzoneContainer.java | 2 +
.../replication/TestReplicationSupervisor.java | 35 ++
.../replication/TestSimpleContainerDownloader.java | 85 ++++
hadoop-hdds/dev-support/checkstyle/checkstyle.xml | 4 +-
hadoop-hdds/docs/content/concept/Containers.zh.md | 47 ++
hadoop-hdds/docs/content/concept/OzoneManager.md | 4 +-
.../docs/content/concept/OzoneManager.zh.md | 92 +++-
hadoop-hdds/docs/content/concept/Recon.md | 163 +++++++
.../docs/content/concept/ReconHighLevelDesign.png | Bin 0 -> 239168 bytes
hadoop-hdds/docs/content/concept/ReconOmDesign.png | Bin 0 -> 162797 bytes
.../docs/content/concept/ReconScmDesign.png | Bin 0 -> 181628 bytes
.../content/concept/StorageContainerManager.md | 5 +-
.../content/concept/StorageContainerManager.zh.md | 69 ++-
hadoop-hdds/docs/content/feature/GDPR.md | 2 +-
hadoop-hdds/docs/content/feature/GDPR.zh.md | 5 +-
hadoop-hdds/docs/content/feature/HA.md | 4 +-
hadoop-hdds/docs/content/feature/HA.zh.md | 117 +++++
hadoop-hdds/docs/content/feature/Quota.md | 79 ++++
hadoop-hdds/docs/content/feature/Quota.zh.md | 74 +++
hadoop-hdds/docs/content/feature/Recon.md | 26 +-
hadoop-hdds/docs/content/feature/Recon.zh.md | 49 ++
hadoop-hdds/docs/content/feature/_index.zh.md | 7 +-
hadoop-hdds/docs/content/interface/ReconApi.md | 511 +++++++++++++++++++++
hadoop-hdds/docs/content/interface/_index.zh.md | 2 +-
hadoop-hdds/docs/content/tools/Admin.md | 4 +-
hadoop-hdds/framework/pom.xml | 20 +-
.../hdds/conf/DatanodeRatisServerConfig.java | 19 +-
.../hadoop/hdds/utils/db/DBStoreBuilder.java | 358 ++++++++-------
.../org/apache/hadoop/hdds/utils/db/RDBStore.java | 14 +-
.../hadoop/hdds/utils/db/TestDBStoreBuilder.java | 21 +-
hadoop-hdds/hadoop-dependency-server/pom.xml | 6 +
hadoop-hdds/hadoop-dependency-test/pom.xml | 12 +
hadoop-hdds/pom.xml | 53 ++-
hadoop-hdds/server-scm/pom.xml | 6 +
.../apache/hadoop/hdds/scm/block/BlockManager.java | 4 +-
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 50 +-
.../block/DatanodeDeletedBlockTransactions.java | 122 ++---
.../hadoop/hdds/scm/block/DeletedBlockLog.java | 4 +-
.../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 53 ++-
.../hdds/scm/block/SCMBlockDeletingService.java | 135 +++---
.../IncrementalContainerReportHandler.java | 4 +
.../hdds/scm/container/ReplicationManager.java | 4 +-
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 9 +-
.../hadoop/hdds/scm/ha/SCMRatisServerImpl.java | 8 +-
.../hadoop/hdds/scm/metadata/PipelineCodec.java | 3 +-
.../hdds/scm/pipeline/PipelineReportHandler.java | 8 +-
.../hdds/scm/pipeline/RatisPipelineUtils.java | 4 +-
.../hdds/scm/pipeline/SCMPipelineManager.java | 39 +-
.../scm/safemode/HealthyPipelineSafeModeRule.java | 3 +-
.../safemode/OneReplicaPipelineSafeModeRule.java | 45 +-
.../hdds/scm/server/SCMBlockProtocolServer.java | 85 ++--
.../hdds/scm/server/StorageContainerManager.java | 9 -
.../hdds/scm/server/ratis/SCMRatisServer.java | 10 +-
.../hdds/scm/server/ratis/SCMStateMachine.java | 1 -
.../src/main/resources/webapps/scm/index.html | 6 +-
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 97 ++--
.../hdds/scm/container/TestReplicationManager.java | 12 +-
.../hadoop/hdds/scm/ha/MockSCMHAManager.java | 26 +-
.../hadoop/hdds/scm/ha/TestSCMRatisResponse.java | 28 +-
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 4 +-
.../safemode/TestHealthyPipelineSafeModeRule.java | 6 +
.../TestOneReplicaPipelineSafeModeRule.java | 71 ++-
.../hdds/scm/safemode/TestSCMSafeModeManager.java | 25 +
.../apache/hadoop/ozone/client/OzoneVolume.java | 17 -
.../ozone/client/io/BlockOutputStreamEntry.java | 131 +-----
.../client/io/BlockOutputStreamEntryPool.java | 89 +---
.../hadoop/ozone/client/io/KeyOutputStream.java | 115 ++---
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 109 +----
hadoop-ozone/common/pom.xml | 5 +
.../main/java/org/apache/hadoop/ozone/OmUtils.java | 98 +++-
.../ozone/om/ha/OMFailoverProxyProvider.java | 2 +-
.../hadoop/ozone/om/helpers/OmBucketInfo.java | 22 +-
.../hadoop/ozone/om/helpers/OmVolumeArgs.java | 33 +-
...OzoneManagerProtocolClientSideTranslatorPB.java | 4 +-
.../hadoop/ozone/security/acl/OzoneObjInfo.java | 10 +
.../hadoop/ozone/security/acl/RequestContext.java | 48 +-
.../org/apache/hadoop/ozone/util/ExitManager.java | 2 +-
.../java/org/apache/hadoop/ozone/TestOmUtils.java | 5 +
.../hadoop/ozone/om/lock/TestOzoneManagerLock.java | 52 +--
hadoop-ozone/datanode/pom.xml | 25 +-
hadoop-ozone/dist/pom.xml | 48 +-
.../docker-image/docker-krb5/Dockerfile-krb5 | 2 +-
.../compose/ozone-mr/hadoop27/docker-compose.yaml | 2 +-
.../compose/ozone-mr/hadoop31/docker-compose.yaml | 2 +-
.../dist/src/main/compose/ozone-mr/hadoop32/.env | 6 +-
.../compose/ozone-mr/hadoop32/docker-compose.yaml | 2 +-
.../dist/src/main/compose/ozonesecure-mr/.env | 3 +-
.../compose/ozonesecure-mr/docker-compose.yaml | 12 +-
.../dist/src/main/compose/ozonesecure-mr/test.sh | 1 +
.../compose/ozonesecure-om-ha/docker-compose.yaml | 4 +-
.../main/compose/ozonesecure/docker-compose.yaml | 2 +
.../main/smoketest/security/ozone-secure-fs.robot | 19 +-
hadoop-ozone/dist/src/shell/ozone/ozone | 11 +-
hadoop-ozone/dist/src/shell/ozone/stop-ozone.sh | 5 +
.../fault-injection-test/mini-chaos-tests/pom.xml | 6 +
.../mini-chaos-tests/src/test/bin/start-chaos.sh | 2 +-
.../apache/hadoop/ozone/MiniOzoneChaosCluster.java | 31 +-
.../hadoop/ozone/MiniOzoneLoadGenerator.java | 8 +-
.../apache/hadoop/ozone/OzoneChaosCluster.java} | 45 +-
.../hadoop/ozone/TestAllMiniChaosOzoneCluster.java | 55 +++
.../ozone/TestDatanodeMiniChaosOzoneCluster.java | 53 +++
.../hadoop/ozone/TestMiniChaosOzoneCluster.java | 100 ++--
.../TestOzoneManagerMiniChaosOzoneCluster.java | 57 +++
.../hadoop/ozone/failure/FailureManager.java | 3 +-
.../org/apache/hadoop/ozone/failure/Failures.java | 13 +
.../ozone/loadgenerators/AgedDirLoadGenerator.java | 1 -
.../ozone/loadgenerators/AgedLoadGenerator.java | 6 +-
.../loadgenerators/FilesystemLoadGenerator.java | 1 -
.../{utils => loadgenerators}/LoadBucket.java | 2 +-
.../hadoop/ozone/loadgenerators/LoadGenerator.java | 17 +
.../loadgenerators/NestedDirLoadGenerator.java | 1 -
.../loadgenerators/RandomDirLoadGenerator.java | 1 -
.../ozone/loadgenerators/RandomLoadGenerator.java | 1 -
.../loadgenerators/ReadOnlyLoadGenerator.java | 1 -
.../src/test/resources/log4j.properties | 2 +-
hadoop-ozone/integration-test/pom.xml | 6 +
.../fs/ozone/TestOzoneFSWithObjectStoreCreate.java | 71 ++-
.../hadoop/fs/ozone/TestOzoneFileSystem.java | 13 +-
.../hadoop/fs/ozone/TestRootedOzoneFileSystem.java | 85 +++-
.../hadoop/hdds/scm/pipeline/TestSCMRestart.java | 6 +
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 49 +-
.../org/apache/hadoop/ozone/RatisTestHelper.java | 3 +-
.../hadoop/ozone/TestStorageContainerManager.java | 18 +-
.../ozone/client/rpc/TestBlockOutputStream.java | 32 +-
.../rpc/TestBlockOutputStreamFlushDelay.java | 22 +-
.../rpc/TestBlockOutputStreamWithFailures.java | 38 +-
...estBlockOutputStreamWithFailuresFlushDelay.java | 34 +-
.../rpc/TestCloseContainerHandlingByClient.java | 35 +-
.../hadoop/ozone/client/rpc/TestCommitWatcher.java | 7 +-
.../client/rpc/TestContainerStateMachine.java | 35 +-
.../rpc/TestContainerStateMachineFailures.java | 64 ++-
.../client/rpc/TestDeleteWithSlowFollower.java | 13 +-
.../client/rpc/TestDiscardPreallocatedBlocks.java | 35 +-
.../client/rpc/TestFailureHandlingByClient.java | 7 +-
.../ozone/client/rpc/TestKeyInputStream.java | 33 +-
.../rpc/TestOzoneClientRetriesOnException.java | 40 +-
...estOzoneClientRetriesOnExceptionFlushDelay.java | 31 +-
.../ozone/client/rpc/TestOzoneRpcClient.java | 5 +-
.../client/rpc/TestOzoneRpcClientAbstract.java | 319 +------------
.../client/rpc/TestValidateBCSIDOnRestart.java | 41 +-
.../ozone/client/rpc/TestWatchForCommit.java | 46 +-
.../commandhandler/TestBlockDeletion.java | 17 +-
.../container/metrics/TestContainerMetrics.java | 49 +-
.../org/apache/hadoop/ozone/om/TestOmAcls.java | 9 +-
.../org/apache/hadoop/ozone/om/TestOmMetrics.java | 105 +++++
.../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 47 ++
.../ozone/om/TestOzoneManagerHAKeyDeletion.java | 77 ++++
.../ozone/om/TestOzoneManagerHAWithData.java | 37 --
.../ozone/om/TestOzoneManagerListVolumes.java | 36 +-
.../hadoop/ozone/om/TestOzoneManagerRestart.java | 100 +++-
.../hadoop/ozone/shell/TestOzoneShellHA.java | 42 ++
.../src/test/resources/contract/ozone.xml | 5 +
.../src/main/proto/OmClientProtocol.proto | 4 +-
hadoop-ozone/interface-storage/pom.xml | 14 +-
.../apache/hadoop/ozone/om/OMMetadataManager.java | 11 +-
.../hadoop/ozone/om/codec/UserVolumeInfoCodec.java | 18 +-
.../hadoop/ozone/om/ratis/OMTransactionInfo.java | 6 +-
.../src/main/proto/OmStorageProtocol.proto | 6 +
hadoop-ozone/ozone-manager/pom.xml | 6 +
.../apache/hadoop/ozone/om/BucketManagerImpl.java | 32 +-
.../apache/hadoop/ozone/om/KeyDeletingService.java | 19 +-
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 85 ++--
.../java/org/apache/hadoop/ozone/om/OMMetrics.java | 37 ++
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 35 +-
.../org/apache/hadoop/ozone/om/OzoneManager.java | 180 ++++++--
.../apache/hadoop/ozone/om/PrefixManagerImpl.java | 31 +-
.../hadoop/ozone/om/S3SecretManagerImpl.java | 8 +-
.../apache/hadoop/ozone/om/TrashPolicyOzone.java | 235 ++++++++++
.../apache/hadoop/ozone/om/VolumeManagerImpl.java | 79 ++--
.../hadoop/ozone/om/codec/OMDBDefinition.java | 6 +-
.../ozone/om/ratis/OzoneManagerDoubleBuffer.java | 29 +-
.../ozone/om/ratis/OzoneManagerRatisServer.java | 50 +-
.../ozone/om/ratis/OzoneManagerStateMachine.java | 2 +-
.../hadoop/ozone/om/request/OMClientRequest.java | 46 +-
.../om/request/bucket/OMBucketCreateRequest.java | 4 +-
.../request/bucket/OMBucketSetPropertyRequest.java | 15 +-
.../om/request/bucket/acl/OMBucketAclRequest.java | 15 +
.../request/bucket/acl/OMBucketAddAclRequest.java | 23 +
.../bucket/acl/OMBucketRemoveAclRequest.java | 24 +
.../request/bucket/acl/OMBucketSetAclRequest.java | 24 +
.../om/request/file/OMDirectoryCreateRequest.java | 23 +-
.../ozone/om/request/file/OMFileCreateRequest.java | 9 +-
.../ozone/om/request/file/OMFileRequest.java | 30 --
.../om/request/key/OMAllocateBlockRequest.java | 16 +-
.../ozone/om/request/key/OMKeyCommitRequest.java | 14 +-
.../ozone/om/request/key/OMKeyCreateRequest.java | 8 +-
.../ozone/om/request/key/OMKeyDeleteRequest.java | 7 +-
.../hadoop/ozone/om/request/key/OMKeyRequest.java | 75 +--
.../ozone/om/request/key/OMKeysDeleteRequest.java | 11 +-
.../ozone/om/request/key/OMKeysRenameRequest.java | 9 +-
.../ozone/om/request/key/acl/OMKeyAclRequest.java | 15 +
.../om/request/key/acl/OMKeyAddAclRequest.java | 24 +
.../om/request/key/acl/OMKeyRemoveAclRequest.java | 24 +
.../om/request/key/acl/OMKeySetAclRequest.java | 24 +
.../request/key/acl/prefix/OMPrefixAclRequest.java | 7 +
.../S3InitiateMultipartUploadRequest.java | 3 +-
.../multipart/S3MultipartUploadAbortRequest.java | 5 +-
.../S3MultipartUploadCommitPartRequest.java | 10 +-
.../hadoop/ozone/om/request/util/ObjectParser.java | 6 +-
.../om/request/volume/OMVolumeCreateRequest.java | 8 +-
.../om/request/volume/OMVolumeDeleteRequest.java | 4 +-
.../ozone/om/request/volume/OMVolumeRequest.java | 24 +-
.../om/request/volume/OMVolumeSetOwnerRequest.java | 13 +-
.../om/request/volume/OMVolumeSetQuotaRequest.java | 6 +-
.../om/request/volume/acl/OMVolumeAclRequest.java | 14 +
.../request/volume/acl/OMVolumeAddAclRequest.java | 24 +
.../volume/acl/OMVolumeRemoveAclRequest.java | 24 +
.../request/volume/acl/OMVolumeSetAclRequest.java | 24 +
.../om/response/file/OMFileCreateResponse.java | 5 +
.../om/response/key/OMAllocateBlockResponse.java | 4 -
.../ozone/om/response/key/OMKeyCommitResponse.java | 4 -
.../ozone/om/response/key/OMKeyCreateResponse.java | 7 +-
.../ozone/om/response/key/OMKeyDeleteResponse.java | 4 -
.../om/response/key/OMKeysDeleteResponse.java | 4 -
.../multipart/S3MultipartUploadAbortResponse.java | 4 -
.../S3MultipartUploadCommitPartResponse.java | 5 +-
.../om/response/volume/OMVolumeCreateResponse.java | 7 +-
.../om/response/volume/OMVolumeDeleteResponse.java | 10 +-
.../response/volume/OMVolumeSetOwnerResponse.java | 13 +-
...OzoneManagerProtocolServerSideTranslatorPB.java | 9 +-
.../ozone/security/acl/OzoneNativeAuthorizer.java | 44 +-
.../apache/hadoop/ozone/om/TestKeyManagerUnit.java | 75 ++-
.../om/ratis/TestOzoneManagerStateMachine.java | 10 +-
.../ozone/om/request/TestOMRequestUtils.java | 68 ++-
.../bucket/TestOMBucketSetPropertyRequest.java | 3 +-
.../bucket/acl/TestOMBucketAddAclRequest.java | 119 +++++
.../bucket/acl/TestOMBucketRemoveAclRequest.java | 137 ++++++
.../bucket/acl/TestOMBucketSetAclRequest.java | 125 +++++
.../ozone/om/request/bucket/acl/package-info.java} | 30 +-
.../ozone/om/request/key/TestOMKeyAclRequest.java | 155 ++++++-
.../request/volume/TestOMVolumeCreateRequest.java | 10 +-
.../volume/TestOMVolumeSetOwnerRequest.java | 8 +-
.../volume/TestOMVolumeSetQuotaRequest.java | 3 +-
.../volume/acl/TestOMVolumeAddAclRequest.java | 7 +
.../volume/acl/TestOMVolumeRemoveAclRequest.java | 8 +
.../volume/acl/TestOMVolumeSetAclRequest.java | 7 +
.../ozone/om/response/TestCleanupTableInfo.java | 328 ++++++++++++-
.../volume/TestOMVolumeCreateResponse.java | 6 +-
.../volume/TestOMVolumeDeleteResponse.java | 8 +-
.../volume/TestOMVolumeSetOwnerResponse.java | 11 +-
.../hadoop/ozone/security/acl/TestVolumeOwner.java | 298 ++++++++++++
.../hadoop/fs/ozone/BasicOzoneFileSystem.java | 26 +-
.../fs/ozone/BasicRootedOzoneFileSystem.java | 28 +-
hadoop-ozone/pom.xml | 57 ++-
.../ozone/recon/codegen/ReconSqlDbConfig.java | 2 +-
.../recon/schema/ReconTaskSchemaDefinition.java | 2 +-
hadoop-ozone/recon/pom.xml | 8 +-
.../persistence/DefaultDataSourceProvider.java | 6 +
.../recon/persistence/DerbyDataSourceProvider.java | 1 -
.../ozone/recon/tasks/FileSizeCountTask.java | 4 +-
.../hadoop/ozone/recon/tasks/ReconTaskConfig.java | 4 +-
.../persistence/TestReconWithDifferentSqlDBs.java | 5 +
.../recon/tasks/TestContainerKeyMapperTask.java | 2 +-
.../ozone/recon/tasks/TestFileSizeCountTask.java | 6 +
.../hadoop/ozone/s3/AWSSignatureProcessor.java | 43 +-
.../hadoop/ozone/s3/OzoneClientProducer.java | 61 ++-
.../apache/hadoop/ozone/s3/SignatureProcessor.java | 2 +
.../hadoop/ozone/s3/endpoint/BucketEndpoint.java | 8 +-
.../ozone/s3/endpoint/ListObjectResponse.java | 11 +
.../hadoop/ozone/s3/exception/S3ErrorTable.java | 5 +
.../hadoop/ozone/s3/TestOzoneClientProducer.java | 25 +-
.../hadoop/ozone/s3/endpoint/TestBucketPut.java | 5 +
.../hadoop/ozone/s3/endpoint/TestRootList.java | 5 +
hadoop-ozone/tools/pom.xml | 2 +-
.../hadoop/ozone/freon/DatanodeBlockPutter.java | 7 +-
.../freon/FollowerAppendLogEntryGenerator.java | 11 +-
.../ozone/freon/LeaderAppendLogEntryGenerator.java | 20 +-
.../hadoop/ozone/shell/SetSpaceQuotaOptions.java | 3 +-
.../java/org/apache/hadoop/ozone/shell/Shell.java | 24 +-
pom.xml | 18 +-
327 files changed, 7924 insertions(+), 3158 deletions(-)
diff --cc hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
index 39d481e,3d1d689..39dcaba
--- a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
+++ b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigTag.java
@@@ -43,5 -43,5 +43,6 @@@ public enum ConfigTag
S3GATEWAY,
DATANODE,
RECON,
- DELETION
++ DELETION,
+ HA
}
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 06d34ed,014c76c..8c23237
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@@ -53,12 -56,8 +56,10 @@@ import org.apache.hadoop.ozone.common.B
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.INVALID_BLOCK_SIZE;
- import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
- import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 5d43a75,aa55480..2fe558f
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@@ -323,27 -322,49 +322,49 @@@ public class DeletedBlockLogImp
public void close() throws IOException {
}
+ private void getTransaction(DeletedBlocksTransaction tx,
+ DatanodeDeletedBlockTransactions transactions) {
+ try {
+ Set<ContainerReplica> replicas = containerManager
- .getContainerReplicas(ContainerID.valueof(tx.getContainerID()));
++ .getContainerReplicas(ContainerID.valueOf(tx.getContainerID()));
+ for (ContainerReplica replica : replicas) {
+ UUID dnID = replica.getDatanodeDetails().getUuid();
+ Set<UUID> dnsWithTransactionCommitted =
+ transactionToDNsCommitMap.get(tx.getTxID());
+ if (dnsWithTransactionCommitted == null || !dnsWithTransactionCommitted
+ .contains(dnID)) {
+ // Transaction need not be sent to dns which have
+ // already committed it
+ transactions.addTransactionToDN(dnID, tx);
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Got container info error.", e);
+ }
+ }
+
@Override
- public Map<Long, Long> getTransactions(
- DatanodeDeletedBlockTransactions transactions) throws IOException {
+ public DatanodeDeletedBlockTransactions getTransactions(
+ int blockDeletionLimit) throws IOException {
lock.lock();
try {
- Map<Long, Long> deleteTransactionMap = new HashMap<>();
+ DatanodeDeletedBlockTransactions transactions =
+ new DatanodeDeletedBlockTransactions();
try (TableIterator<Long,
? extends Table.KeyValue<Long, DeletedBlocksTransaction>> iter =
scmMetadataStore.getDeletedBlocksTXTable().iterator()) {
- while (iter.hasNext()) {
+ int numBlocksAdded = 0;
+ while (iter.hasNext() && numBlocksAdded < blockDeletionLimit) {
Table.KeyValue<Long, DeletedBlocksTransaction> keyValue =
iter.next();
- DeletedBlocksTransaction block = keyValue.getValue();
- if (block.getCount() > -1 && block.getCount() <= maxRetry) {
- if (transactions.addTransaction(block,
- transactionToDNsCommitMap.get(block.getTxID()))) {
- deleteTransactionMap.put(block.getContainerID(),
- block.getTxID());
- transactionToDNsCommitMap
- .putIfAbsent(block.getTxID(), new LinkedHashSet<>());
- }
+ DeletedBlocksTransaction txn = keyValue.getValue();
- final ContainerID id = ContainerID.valueof(txn.getContainerID());
++ final ContainerID id = ContainerID.valueOf(txn.getContainerID());
+ if (txn.getCount() > -1 && txn.getCount() <= maxRetry
+ && !containerManager.getContainer(id).isOpen()) {
+ numBlocksAdded += txn.getLocalIDCount();
+ getTransaction(txn, transactions);
+ transactionToDNsCommitMap
+ .putIfAbsent(txn.getTxID(), new LinkedHashSet<>());
}
}
}
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index c86c7d9,0000000..33f408d
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@@ -1,170 -1,0 +1,177 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerProxy;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * SCMHAManagerImpl uses Apache Ratis for HA implementation. We will have 2N+1
+ * node Ratis ring. The Ratis ring will have one Leader node and 2N follower
+ * nodes.
+ *
+ * TODO
+ *
+ */
+public class SCMHAManagerImpl implements SCMHAManager {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMHAManagerImpl.class);
+
+ private final SCMRatisServerImpl ratisServer;
+ private final ConfigurationSource conf;
+
+ /**
+ * Creates SCMHAManager instance.
+ */
+ public SCMHAManagerImpl(final ConfigurationSource conf) throws IOException {
+ this.conf = conf;
+ this.ratisServer = new SCMRatisServerImpl(
+ conf.getObject(SCMHAConfiguration.class), conf);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void start() throws IOException {
+ ratisServer.start();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isLeader() {
+ if (!SCMHAUtils.isSCMHAEnabled(conf)) {
+ // When SCM HA is not enabled, the current SCM is always the leader.
+ return true;
+ }
+ RaftServer server = ratisServer.getServer();
+ Preconditions.checkState(server instanceof RaftServerProxy);
+ RaftServerImpl serverImpl = null;
+ try {
+ // SCM only has one raft group.
+ serverImpl = ((RaftServerProxy) server)
+ .getImpl(ratisServer.getRaftGroupId());
+ if (serverImpl != null) {
+ // Only when it's sure the current SCM is the leader, otherwise
+ // it should all return false.
+ return serverImpl.isLeader();
+ }
+ } catch (IOException ioe) {
+ LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
+ "whether it's leader. ", ioe);
+ }
+
+ return false;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SCMRatisServer getRatisServer() {
+ return ratisServer;
+ }
+
+ private RaftPeerId getPeerIdFromRoleInfo(RaftServerImpl serverImpl) {
++ /*
++ TODO: Fix Me
++ Ratis API has changed.
++ RaftServerImpl#getRoleInfoProto is no more public.
++
+ if (serverImpl.isLeader()) {
+ return RaftPeerId.getRaftPeerId(
+ serverImpl.getRoleInfoProto().getLeaderInfo().toString());
+ } else if (serverImpl.isFollower()) {
+ return RaftPeerId.valueOf(
+ serverImpl.getRoleInfoProto().getFollowerInfo()
+ .getLeaderInfo().getId().getId());
+ } else {
+ return null;
+ }
++ */
++ return null;
+ }
+
+ @Override
+ public RaftPeer getSuggestedLeader() {
+ RaftServer server = ratisServer.getServer();
+ Preconditions.checkState(server instanceof RaftServerProxy);
+ RaftServerImpl serverImpl = null;
+ try {
+ // SCM only has one raft group.
+ serverImpl = ((RaftServerProxy) server)
+ .getImpl(ratisServer.getRaftGroupId());
+ if (serverImpl != null) {
+ RaftPeerId peerId = getPeerIdFromRoleInfo(serverImpl);
+ if (peerId != null) {
- return new RaftPeer(peerId);
++ return RaftPeer.newBuilder().setId(peerId).build();
+ }
+ return null;
+ }
+ } catch (IOException ioe) {
+ LOG.error("Fail to get RaftServer impl and therefore it's not clear " +
+ "whether it's leader. ", ioe);
+ }
+ return null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void shutdown() throws IOException {
+ ratisServer.stop();
+ }
+
+ @Override
+ public List<String> getRatisRoles() {
+ return getRatisServer()
+ .getRaftPeers()
+ .stream()
+ .map(peer -> peer.getAddress() == null ? "" : peer.getAddress())
+ .collect(Collectors.toList());
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public NotLeaderException triggerNotLeaderException() {
+ return new NotLeaderException(RaftGroupMemberId.valueOf(
+ ratisServer.getServer().getId(),
+ ratisServer.getRaftGroupId()),
+ getSuggestedLeader(),
+ ratisServer.getRaftPeers());
+ }
+}
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
index 8611b1f,0000000..ab766c9
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java
@@@ -1,224 -1,0 +1,228 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * TODO.
+ */
+public class SCMRatisServerImpl implements SCMRatisServer {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMRatisServerImpl.class);
+
+ private final InetSocketAddress address;
+ private final RaftServer server;
+ private final RaftGroupId raftGroupId;
+ private final RaftGroup raftGroup;
+ private final RaftPeerId raftPeerId;
+ private final SCMStateMachine scmStateMachine;
+ private final ClientId clientId = ClientId.randomId();
+ private final AtomicLong callId = new AtomicLong();
+
+ // TODO: Refactor and remove ConfigurationSource and use only
+ // SCMHAConfiguration.
+ SCMRatisServerImpl(final SCMHAConfiguration haConf,
+ final ConfigurationSource conf)
+ throws IOException {
+ this.address = haConf.getRatisBindAddress();
+
+ SCMHAGroupBuilder scmHAGroupBuilder = new SCMHAGroupBuilder(haConf, conf);
+ this.raftPeerId = scmHAGroupBuilder.getPeerId();
+ this.raftGroupId = scmHAGroupBuilder.getRaftGroupId();
+ this.raftGroup = scmHAGroupBuilder.getRaftGroup();
+
+ final RaftProperties serverProperties = RatisUtil
+ .newRaftProperties(haConf, conf);
+ this.scmStateMachine = new SCMStateMachine();
+ this.server = RaftServer.newBuilder()
+ .setServerId(raftPeerId)
+ .setGroup(raftGroup)
+ .setProperties(serverProperties)
+ .setStateMachine(scmStateMachine)
+ .build();
+ }
+
+ @Override
+ public void start() throws IOException {
+ server.start();
+ }
+
+ @Override
+ public void registerStateMachineHandler(final RequestType handlerType,
+ final Object handler) {
+ scmStateMachine.registerHandler(handlerType, handler);
+ }
+
+ @Override
+ public SCMRatisResponse submitRequest(SCMRatisRequest request)
+ throws IOException, ExecutionException, InterruptedException {
+ final RaftClientRequest raftClientRequest = new RaftClientRequest(
+ clientId, server.getId(), raftGroupId, nextCallId(), request.encode(),
+ RaftClientRequest.writeRequestType(), null);
+ final RaftClientReply raftClientReply =
+ server.submitClientRequestAsync(raftClientRequest).get();
+ return SCMRatisResponse.decode(raftClientReply);
+ }
+
+ private long nextCallId() {
+ return callId.getAndIncrement() & Long.MAX_VALUE;
+ }
+
+ @Override
+ public void stop() throws IOException {
+ server.close();
+ }
+
+ @Override
+ public RaftServer getServer() {
+ return server;
+ }
+
+ @Override
+ public RaftGroupId getRaftGroupId() {
+ return raftGroupId;
+ }
+
+ @Override
+ public List<RaftPeer> getRaftPeers() {
- return Collections.singletonList(new RaftPeer(raftPeerId));
++ return Collections.singletonList(RaftPeer.newBuilder()
++ .setId(raftPeerId).build());
+ }
+
+
+ /**
+ * If the SCM group starts from {@link ScmConfigKeys#OZONE_SCM_NAMES},
+ * its raft peers should locate on different nodes, and use the same port
+ * to communicate with each other.
+ *
+ * Each of the raft peer figures out its {@link RaftPeerId} by computing
+ * its position in {@link ScmConfigKeys#OZONE_SCM_NAMES}.
+ *
+ * Assume {@link ScmConfigKeys#OZONE_SCM_NAMES} is "ip0,ip1,ip2",
+ * scm with ip0 identifies its {@link RaftPeerId} as scm0,
+ * scm with ip1 identifies its {@link RaftPeerId} as scm1,
+ * scm with ip2 identifies its {@link RaftPeerId} as scm2.
+ *
+ * After startup, they will form a {@link RaftGroup} with groupID
+ * "SCM-HA-Service", and communicate with each other via
+ * ozone.scm.ha.ratis.bind.port.
+ */
+ private static class SCMHAGroupBuilder {
+ private final static String SCM_SERVICE_ID = "SCM-HA-Service";
+
+ private final RaftGroupId raftGroupId;
+ private final RaftGroup raftGroup;
+ private RaftPeerId selfPeerId;
+
+ /**
+ * @return raft group
+ */
+ public RaftGroup getRaftGroup() {
+ return raftGroup;
+ }
+
+ /**
+ * @return raft group id
+ */
+ public RaftGroupId getRaftGroupId() {
+ return raftGroupId;
+ }
+
+ /**
+ * @return raft peer id
+ */
+ public RaftPeerId getPeerId() {
+ return selfPeerId;
+ }
+
+ SCMHAGroupBuilder(final SCMHAConfiguration haConf,
+ final ConfigurationSource conf) throws IOException {
+ // fetch port
+ int port = haConf.getRatisBindAddress().getPort();
+
+ // fetch localhost
+ InetAddress localHost = InetAddress.getLocalHost();
+
+ // fetch hosts from ozone.scm.names
+ List<String> hosts =
+ Arrays.stream(conf.getTrimmedStrings(ScmConfigKeys.OZONE_SCM_NAMES))
+ .map(scmName -> HddsUtils.getHostName(scmName).get())
+ .collect(Collectors.toList());
+
+ final List<RaftPeer> raftPeers = new ArrayList<>();
+ for (int i = 0; i < hosts.size(); ++i) {
+ String nodeId = "scm" + i;
+ RaftPeerId peerId = RaftPeerId.getRaftPeerId(nodeId);
+
+ String host = hosts.get(i);
+ if (InetAddress.getByName(host).equals(localHost)) {
+ selfPeerId = peerId;
+ }
+
- raftPeers.add(new RaftPeer(peerId, host + ":" + port));
++ raftPeers.add(RaftPeer.newBuilder()
++ .setId(peerId)
++ .setAddress(host + ":" + port)
++ .build());
+ }
+
+ if (selfPeerId == null) {
+ String errorMessage = "localhost " + localHost
+ + " does not exist in ozone.scm.names "
+ + conf.get(ScmConfigKeys.OZONE_SCM_NAMES);
+ throw new IOException(errorMessage);
+ }
+
+ LOG.info("Build a RaftGroup for SCMHA, " +
+ "localHost: {}, OZONE_SCM_NAMES: {}, selfPeerId: {}",
+ localHost, conf.get(ScmConfigKeys.OZONE_SCM_NAMES), selfPeerId);
+
+ raftGroupId = RaftGroupId.valueOf(UUID.nameUUIDFromBytes(
+ SCM_SERVICE_ID.getBytes(StandardCharsets.UTF_8)));
+
+ raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
+ }
+ }
+}
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java
index 9ab8c66,0000000..3cb56a6
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java
@@@ -1,590 -1,0 +1,596 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.ratis;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupInfoReply;
+import org.apache.ratis.protocol.GroupInfoRequest;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.LifeCycle;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
+import static org.apache.hadoop.hdds.conf.ConfigTag.RATIS;
+import static org.apache.hadoop.hdds.conf.ConfigTag.SCM;
+import static org.apache.hadoop.hdds.conf.ConfigTag.HA;
+
+/**
+ * Class for SCM Ratis Server.
+ */
+public final class SCMRatisServer {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(SCMRatisServer.class);
+
+ private final StorageContainerManager scm;
+ private final SCMStateMachine scmStateMachine;
+
+ private final String storageDir;
+ private final int port;
+ private final InetSocketAddress scmRatisAddress;
+ private final RaftServer server;
+ private final RaftGroupId raftGroupId;
+ private final RaftGroup raftGroup;
+ private final RaftPeerId raftPeerId;
+
+ private final ClientId clientId = ClientId.randomId();
+ private final ScheduledExecutorService scheduledRoleChecker;
+ private long roleCheckInitialDelayMs = 1000; // 1 second default
+ private long roleCheckIntervalMs;
+ private ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock();
+ private Optional<RaftPeerRole> cachedPeerRole = Optional.empty();
+ private Optional<RaftPeerId> cachedLeaderPeerId = Optional.empty();
+
+ private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+ private static long nextCallId() {
+ return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
+ }
+
+ /**
+ * Creates a SCM Ratis Server.
+ * @throws IOException
+ */
+ private SCMRatisServer(SCMRatisServerConfiguration conf,
+ StorageContainerManager scm, String ratisStorageDir,
+ String raftGroupIdStr, RaftPeerId localRaftPeerId,
+ InetSocketAddress addr, List<RaftPeer> raftPeers)
+ throws IOException {
+ this.scm = scm;
+ this.scmRatisAddress = addr;
+ this.port = addr.getPort();
+ this.storageDir = ratisStorageDir;
+ RaftProperties serverProperties = newRaftProperties(conf);
+
+ this.raftPeerId = localRaftPeerId;
+ this.raftGroupId = RaftGroupId.valueOf(
+ getRaftGroupIdFromOmServiceId(raftGroupIdStr));
+ this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers);
+
+ StringBuilder raftPeersStr = new StringBuilder();
+ for (RaftPeer peer : raftPeers) {
+ raftPeersStr.append(", ").append(peer.getAddress());
+ }
+ LOG.info("Instantiating SCM Ratis server with GroupID: {} and " +
+ "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2));
+ this.scmStateMachine = getStateMachine();
+
+ this.server = RaftServer.newBuilder()
+ .setServerId(this.raftPeerId)
+ .setGroup(this.raftGroup)
+ .setProperties(serverProperties)
+ .setStateMachine(scmStateMachine)
+ .build();
+
+ // Run a scheduler to check and update the server role on the leader
+ // periodically
+ this.scheduledRoleChecker = Executors.newSingleThreadScheduledExecutor();
+ this.scheduledRoleChecker.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ // Run this check only on the leader OM
+ if (cachedPeerRole.isPresent() &&
+ cachedPeerRole.get() == RaftPeerRole.LEADER) {
+ updateServerRole();
+ }
+ }
+ }, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS);
+ }
+
+ /**
+ * Create a SCM Ratis Server instance.
+ */
+ public static SCMRatisServer newSCMRatisServer(
+ SCMRatisServerConfiguration conf, StorageContainerManager scm,
+ SCMNodeDetails scmNodeDetails, List<SCMNodeDetails> peers,
+ String ratisStorageDir)
+ throws IOException {
+ String scmServiceId = scmNodeDetails.getSCMServiceId();
+
+ String scmNodeId = scmNodeDetails.getSCMNodeId();
+ RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(scmNodeId);
+ InetSocketAddress ratisAddr = new InetSocketAddress(
+ scmNodeDetails.getAddress(), scmNodeDetails.getRatisPort());
+
- RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr);
++ RaftPeer localRaftPeer = RaftPeer.newBuilder()
++ .setId(localRaftPeerId)
++ .setAddress(ratisAddr)
++ .build();
+
+ List<RaftPeer> raftPeers = new ArrayList<>();
+ raftPeers.add(localRaftPeer);
+
+ for (SCMNodeDetails peer : peers) {
+ String peerNodeId = peer.getSCMNodeId();
+ InetSocketAddress peerRatisAddr = new InetSocketAddress(
+ peer.getAddress(), peer.getRatisPort());
+ RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId);
- RaftPeer raftPeer = new RaftPeer(raftPeerId, peerRatisAddr);
++ RaftPeer raftPeer = RaftPeer.newBuilder()
++ .setId(raftPeerId)
++ .setAddress(peerRatisAddr)
++ .build();
+ // Add other SCMs in Ratis ring
+ raftPeers.add(raftPeer);
+ }
+
+ return new SCMRatisServer(conf, scm, ratisStorageDir, scmServiceId,
+ localRaftPeerId, ratisAddr, raftPeers);
+ }
+
+ private UUID getRaftGroupIdFromOmServiceId(String scmServiceId) {
+ return UUID.nameUUIDFromBytes(scmServiceId.getBytes(
+ StandardCharsets.UTF_8));
+ }
+
+ private SCMStateMachine getStateMachine() {
+ return new SCMStateMachine(this);
+ }
+
+ private RaftProperties newRaftProperties(SCMRatisServerConfiguration conf) {
+ final RaftProperties properties = new RaftProperties();
+ // Set RPC type
+ final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(conf.getRpcType());
+ RaftConfigKeys.Rpc.setType(properties, rpc);
+ // Set the ratis port number
+ if (rpc == SupportedRpcType.GRPC) {
+ GrpcConfigKeys.Server.setPort(properties, port);
+ } else if (rpc == SupportedRpcType.NETTY) {
+ NettyConfigKeys.Server.setPort(properties, port);
+ }
+ // Set Ratis storage directory
+ RaftServerConfigKeys.setStorageDir(properties,
+ Collections.singletonList(new File(storageDir)));
+ // Set RAFT segment size
+ RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
+ SizeInBytes.valueOf((long)conf.getSegmentSize()));
+ // Set RAFT segment pre-allocated size
+ RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties,
+ (int)conf.getLogAppenderQueueByteLimit());
+ RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
+ SizeInBytes.valueOf(conf.getLogAppenderQueueNum()));
+ RaftServerConfigKeys.Log.setPreallocatedSize(properties,
+ SizeInBytes.valueOf((int)conf.getPreallocatedSize()));
+ RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties,
+ false);
+ RaftServerConfigKeys.Log.setPurgeGap(properties, conf.getLogPurgeGap());
+ // For grpc set the maximum message size
+ // TODO: calculate the optimal max message size
+ GrpcConfigKeys.setMessageSizeMax(properties,
+ SizeInBytes.valueOf((int)conf.getLogAppenderQueueByteLimit()));
+
+ // Set the server request timeout
+ final TimeDuration serverRequestTimeout = TimeDuration.valueOf(
+ conf.getRequestTimeout(), TimeUnit.MILLISECONDS);
+ RaftServerConfigKeys.Rpc.setRequestTimeout(properties,
+ serverRequestTimeout);
+ // Set timeout for server retry cache entry
+ final TimeDuration retryCacheTimeout = TimeDuration.valueOf(
+ conf.getRetryCacheTimeout(), TimeUnit.MILLISECONDS);
+ RaftServerConfigKeys.RetryCache.setExpiryTime(properties,
+ retryCacheTimeout);
+ // Set the server min and max timeout
+ final TimeDuration serverMinTimeout = TimeDuration.valueOf(
+ conf.getMinTimeout(), TimeUnit.MILLISECONDS);
+ final TimeDuration serverMaxTimeout = TimeDuration.valueOf(
+ conf.getMinTimeout() + 200L, TimeUnit.MILLISECONDS);
+ RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
+ serverMinTimeout);
+ RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
+ serverMaxTimeout);
+ // Set the number of maximum cached segments
+ RaftServerConfigKeys.Log.setSegmentCacheNumMax(properties, 2);
+ // TODO: set max write buffer size
+ // Set the ratis leader election timeout
+ final TimeDuration leaderElectionMinTimeout = TimeDuration.valueOf(
+ conf.getMinLeaderElectionTimeout(), TimeUnit.MILLISECONDS);
+ RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
+ leaderElectionMinTimeout);
+ long leaderElectionMaxTimeout = leaderElectionMinTimeout.toLong(
+ TimeUnit.MILLISECONDS) + 200;
+ RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
+ TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
+
+ final TimeDuration nodeFailureTimeout = TimeDuration.valueOf(
+ conf.getFailureTimeout(), TimeUnit.MILLISECONDS);
+ RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties,
+ nodeFailureTimeout);
+ RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
+ nodeFailureTimeout);
+
+ // Ratis leader role check
+ this.roleCheckIntervalMs = conf.getRoleCheckerInterval();
+ this.roleCheckInitialDelayMs = leaderElectionMinTimeout
+ .toLong(TimeUnit.MILLISECONDS);
+
+ return properties;
+ }
+
+ /**
+ * Start the Ratis server.
+ * @throws IOException
+ */
+ public void start() throws IOException {
+ LOG.info("Starting {} {} at port {}", getClass().getSimpleName(),
+ server.getId(), port);
+ server.start();
+ }
+
+ /**
+ * Stop the Ratis server.
+ */
+ public void stop() {
+ try {
+ server.close();
+ scmStateMachine.stop();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private boolean checkCachedPeerRoleIsLeader() {
+ this.roleCheckLock.readLock().lock();
+ try {
+ if (cachedPeerRole.isPresent() &&
+ cachedPeerRole.get() ==RaftPeerRole.LEADER) {
+ return true;
+ }
+ return false;
+ } finally {
+ this.roleCheckLock.readLock().unlock();
+ }
+ }
+
+ public boolean isLeader() {
+ if (checkCachedPeerRoleIsLeader()) {
+ return true;
+ }
+
+ // Get the server role from ratis server and update the cached values.
+ updateServerRole();
+
+ // After updating the server role, check and return if leader or not.
+ return checkCachedPeerRoleIsLeader();
+ }
+
+ @VisibleForTesting
+ public LifeCycle.State getServerState() {
+ return server.getLifeCycleState();
+ }
+
+ @VisibleForTesting
+ public RaftPeerId getRaftPeerId() {
+ return this.raftPeerId;
+ }
+
+ public RaftGroup getRaftGroup() {
+ return this.raftGroup;
+ }
+
+ /**
+ * Get the local directory where ratis logs will be stored.
+ */
+ public static String getSCMRatisDirectory(ConfigurationSource conf) {
+ String storageDir = conf.get(ScmConfigKeys.OZONE_SCM_RATIS_STORAGE_DIR);
+
+ if (Strings.isNullOrEmpty(storageDir)) {
+ storageDir = ServerUtils.getDefaultRatisDirectory(conf);
+ }
+ return storageDir;
+ }
+
+ public Optional<RaftPeerId> getCachedLeaderPeerId() {
+ this.roleCheckLock.readLock().lock();
+ try {
+ return cachedLeaderPeerId;
+ } finally {
+ this.roleCheckLock.readLock().unlock();
+ }
+ }
+
+ public StorageContainerManager getSCM() {
+ return scm;
+ }
+
+ @VisibleForTesting
+ public SCMStateMachine getScmStateMachine() {
+ return scmStateMachine;
+ }
+
+ public int getServerPort() {
+ return port;
+ }
+
+ public void updateServerRole() {
+ try {
+ GroupInfoReply groupInfo = getGroupInfo();
+ RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto();
+ RaftPeerRole thisNodeRole = roleInfoProto.getRole();
+
+ if (thisNodeRole.equals(RaftPeerRole.LEADER)) {
+ setServerRole(thisNodeRole, raftPeerId);
+
+ } else if (thisNodeRole.equals(RaftPeerRole.FOLLOWER)) {
+ ByteString leaderNodeId = roleInfoProto.getFollowerInfo()
+ .getLeaderInfo().getId().getId();
+ // There may be a chance, here we get leaderNodeId as null. For
+ // example, in 3 node OM Ratis, if 2 OM nodes are down, there will
+ // be no leader.
+ RaftPeerId leaderPeerId = null;
+ if (leaderNodeId != null && !leaderNodeId.isEmpty()) {
+ leaderPeerId = RaftPeerId.valueOf(leaderNodeId);
+ }
+
+ setServerRole(thisNodeRole, leaderPeerId);
+
+ } else {
+ setServerRole(thisNodeRole, null);
+
+ }
+ } catch (IOException e) {
+ LOG.error("Failed to retrieve RaftPeerRole. Setting cached role to " +
+ "{} and resetting leader info.", RaftPeerRole.UNRECOGNIZED, e);
+ setServerRole(null, null);
+ }
+ }
+
+ public TermIndex getLastAppliedTermIndex() {
+ return scmStateMachine.getLastAppliedTermIndex();
+ }
+
+ private GroupInfoReply getGroupInfo() throws IOException {
+ GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId,
+ raftPeerId, raftGroupId, nextCallId());
+ GroupInfoReply groupInfo = server.getGroupInfo(groupInfoRequest);
+ return groupInfo;
+ }
+
+ private void setServerRole(RaftPeerRole currentRole,
+ RaftPeerId leaderPeerId) {
+ this.roleCheckLock.writeLock().lock();
+ try {
+ this.cachedPeerRole = Optional.ofNullable(currentRole);
+ this.cachedLeaderPeerId = Optional.ofNullable(leaderPeerId);
+ } finally {
+ this.roleCheckLock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Configuration used by SCM Ratis Server.
+ */
+ @ConfigGroup(prefix = "ozone.scm.ratis")
+ public static class SCMRatisServerConfiguration {
+ @Config(key = "rpc.type",
+ type = ConfigType.STRING,
+ defaultValue = "GRPC",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "Ratis supports different kinds of transports like" +
+ " netty, GRPC, Hadoop RPC etc. This picks one of those for" +
+ " this cluster."
+ )
+ private String rpcType;
+
+ @Config(key = "segment.size",
+ type = ConfigType.SIZE,
+ defaultValue = "16KB",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The size of the raft segment used by Apache Ratis on" +
+ " SCM. (16 KB by default)"
+ )
+ private double segmentSize = 16 * 1024;
+
+ @Config(key = "segment.preallocated.size",
+ type = ConfigType.SIZE,
+ defaultValue = "16KB",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The size of the buffer which is preallocated for" +
+ " raft segment used by Apache Ratis on SCM.(16 KB by default)"
+ )
+ private double preallocatedSize = 16 * 1024;
+
+ @Config(key = "log.appender.queue.num-elements",
+ type = ConfigType.INT,
+ defaultValue = "1024",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "Number of operation pending with Raft's Log Worker."
+ )
+ private int logAppenderQueueNum = 1024;
+
+ @Config(key = "log.appender.queue.byte-limit",
+ type = ConfigType.SIZE,
+ defaultValue = "32MB",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "Byte limit for Raft's Log Worker queue."
+ )
+ private double logAppenderQueueByteLimit = 32 * 1024 * 1024;
+
+ @Config(key = "log.purge.gap",
+ type = ConfigType.INT,
+ defaultValue = "1000000",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The minimum gap between log indices for Raft server to" +
+ " purge its log segments after taking snapshot."
+ )
+ private int logPurgeGap = 1000000;
+
+ @Config(key = "server.request.timeout",
+ type = ConfigType.TIME,
+ defaultValue = "3s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The timeout duration for SCM's ratis server request."
+ )
+ private long requestTimeout = 3 * 1000L;
+
+ @Config(key = "server.retry.cache.timeout",
+ type = ConfigType.TIME,
+ defaultValue = "60s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "Retry Cache entry timeout for SCM's ratis server."
+ )
+ private long retryCacheTimeout = 60 * 1000L;
+
+ @Config(key = "minimum.timeout",
+ type = ConfigType.TIME,
+ defaultValue = "1s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The minimum timeout duration for SCM's Ratis server rpc."
+ )
+ private long minTimeout = 1 * 1000L;
+
+ @Config(key = "leader.election.minimum.timeout.duration",
+ type = ConfigType.TIME,
+ defaultValue = "1s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The minimum timeout duration for SCM ratis leader" +
+ " election. Default is 1s."
+ )
+ private long minLeaderElectionTimeout = 1 * 1000L;
+
+ @Config(key = "server.failure.timeout.duration",
+ type = ConfigType.TIME,
+ defaultValue = "120s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The timeout duration for ratis server failure" +
+ " detection, once the threshold has reached, the ratis state" +
+ " machine will be informed about the failure in the ratis ring."
+ )
+ private long failureTimeout = 120 * 1000L;
+
+ @Config(key = "server.role.check.interval",
+ type = ConfigType.TIME,
+ defaultValue = "15s",
+ tags = {SCM, OZONE, HA, RATIS},
+ description = "The interval between SCM leader performing a role" +
+ " check on its ratis server. Ratis server informs SCM if it loses" +
+ " the leader role. The scheduled check is an secondary check to" +
+ " ensure that the leader role is updated periodically"
+ )
+ private long roleCheckerInterval = 15 * 1000L;
+
+ public String getRpcType() {
+ return rpcType;
+ }
+
+ public double getSegmentSize() {
+ return segmentSize;
+ }
+
+ public double getPreallocatedSize() {
+ return preallocatedSize;
+ }
+
+ public int getLogAppenderQueueNum() {
+ return logAppenderQueueNum;
+ }
+
+ public double getLogAppenderQueueByteLimit() {
+ return logAppenderQueueByteLimit;
+ }
+
+ public int getLogPurgeGap() {
+ return logPurgeGap;
+ }
+
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public long getRetryCacheTimeout() {
+ return retryCacheTimeout;
+ }
+
+ public long getMinTimeout() {
+ return minTimeout;
+ }
+
+ public long getMinLeaderElectionTimeout() {
+ return minLeaderElectionTimeout;
+ }
+
+ public long getFailureTimeout() {
+ return failureTimeout;
+ }
+
+
+ public long getRoleCheckerInterval() {
+ return roleCheckerInterval;
+ }
+ }
+}
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java
index 9a725a6,0000000..1bc71e4
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java
@@@ -1,242 -1,0 +1,241 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.server.ratis;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.util.LifeCycle;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Class for SCM StateMachine.
+ */
+public class SCMStateMachine extends BaseStateMachine {
+ static final Logger LOG =
+ LoggerFactory.getLogger(SCMStateMachine.class);
+ private final SimpleStateMachineStorage storage =
+ new SimpleStateMachineStorage();
+ private final SCMRatisServer scmRatisServer;
+ private final StorageContainerManager scm;
+ private RaftGroupId raftGroupId;
+ private final SCMRatisSnapshotInfo snapshotInfo;
+ private final ExecutorService executorService;
+ private final ExecutorService installSnapshotExecutor;
+
+ // Map which contains index and term for the ratis transactions which are
+ // stateMachine entries which are recived through applyTransaction.
+ private ConcurrentMap<Long, Long> applyTransactionMap =
+ new ConcurrentSkipListMap<>();
+
+ /**
+ * Create a SCM state machine.
+ */
+ public SCMStateMachine(SCMRatisServer ratisServer) {
+ this.scmRatisServer = ratisServer;
+ this.scm = ratisServer.getSCM();
+
+ // TODO: remove the whole file later
+ this.snapshotInfo = null;
+ updateLastAppliedIndexWithSnaphsotIndex();
+
+ ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("SCM StateMachine ApplyTransaction Thread - %d").build();
+ this.executorService = HadoopExecutors.newSingleThreadExecutor(build);
+ this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor();
+ }
+
+ /**
+ * Initializes the State Machine with the given server, group and storage.
+ */
+ @Override
+ public void initialize(RaftServer server, RaftGroupId id,
+ RaftStorage raftStorage) throws IOException {
+ getLifeCycle().startAndTransition(() -> {
+ super.initialize(server, id, raftStorage);
+ this.raftGroupId = id;
+ storage.init(raftStorage);
+ });
+ }
+
+ /**
+ * Pre-execute the update request into state machine.
+ */
+ @Override
+ public TransactionContext startTransaction(
+ RaftClientRequest raftClientRequest) {
+ return TransactionContext.newBuilder()
+ .setClientRequest(raftClientRequest)
+ .setStateMachine(this)
+ .setServerRole(RaftProtos.RaftPeerRole.LEADER)
+ .setLogData(raftClientRequest.getMessage().getContent())
+ .build();
+ }
+
+ /**
+ * Apply a committed log entry to state machine.
+ */
+ @Override
+ public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+ CompletableFuture<Message> ratisFuture =
+ new CompletableFuture<>();
+ //TODO execute SCMRequest and process SCMResponse
+ return ratisFuture;
+ }
+
+ /**
+ * Query state machine.
+ */
+ @Override
+ public CompletableFuture<Message> query(Message request) {
+ //TODO make handler respond to the query request.
+ return CompletableFuture.completedFuture(request);
+ }
+
+ /**
+ * Pause state machine.
+ */
+ @Override
+ public void pause() {
+ getLifeCycle().transition(LifeCycle.State.PAUSING);
+ getLifeCycle().transition(LifeCycle.State.PAUSED);
+ }
+
+ /**
+ * Unpause state machine and update the lastAppliedIndex.
+ * Following after uploading new state to state machine.
+ */
+ public void unpause(long newLastAppliedSnaphsotIndex,
+ long newLastAppliedSnapShotTermIndex) {
+ getLifeCycle().startAndTransition(() -> {
+ this.setLastAppliedTermIndex(TermIndex.newTermIndex(
+ newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex));
+ });
+ }
+
+ /**
+ * Take SCM snapshot and write index to file.
+ * @return actual index or 0 if error.
+ */
+ @Override
+ public long takeSnapshot() throws IOException {
+ LOG.info("Saving Ratis snapshot on the SCM.");
+ if (scm != null) {
+ // TODO: remove the whole file later
+ return 0;
+ }
+ return 0;
+ }
+
+ /**
+ * Get latest SCM snapshot.
+ */
+ @Override
+ public SnapshotInfo getLatestSnapshot() {
+ return snapshotInfo;
+ }
+
+ private synchronized void updateLastApplied() {
+ Long appliedTerm = null;
+ long appliedIndex = -1;
+ for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {
+ final Long removed = applyTransactionMap.remove(i);
+ if (removed == null) {
+ break;
+ }
+ appliedTerm = removed;
+ appliedIndex = i;
+ }
+ if (appliedTerm != null) {
+ updateLastAppliedTermIndex(appliedTerm, appliedIndex);
+ }
+ }
+
+ /**
+ * Called to notify state machine about indexes which are processed
+ * internally by Raft Server, this currently happens when conf entries are
+ * processed in raft Server. This keep state machine to keep a track of index
+ * updates.
+ */
- @Override
+ public void notifyIndexUpdate(long currentTerm, long index) {
+ applyTransactionMap.put(index, currentTerm);
+ updateLastApplied();
+ snapshotInfo.updateTerm(currentTerm);
+ }
+
+ /**
+ * Notifies the state machine that the raft peer is no longer leader.
+ */
+ @Override
+ public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
+ scmRatisServer.updateServerRole();
+ }
+
+ /**
+ * Transfer from log entry to string.
+ */
+ @Override
+ public String toStateMachineLogEntryString(
+ RaftProtos.StateMachineLogEntryProto proto) {
+ //TODO implement transfer from proto to SCMRequest body.
+ return null;
+ }
+
+ /**
+ * Update lastAppliedIndex term in snapshot info.
+ */
+ public void updateLastAppliedIndexWithSnaphsotIndex() {
+ setLastAppliedTermIndex(TermIndex.newTermIndex(snapshotInfo.getTerm(),
+ snapshotInfo.getIndex()));
+ LOG.info("LastAppliedIndex set from SnapShotInfo {}",
+ getLastAppliedTermIndex());
+ }
+
+ @VisibleForTesting
+ void addApplyTransactionTermIndex(long term, long index) {
+ applyTransactionMap.put(index, term);
+ }
+
+ public void stop() {
+ HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS);
+ HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS);
+ }
+}
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
index 886eaee,0000000..ac58438
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/MockSCMHAManager.java
@@@ -1,208 -1,0 +1,224 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.EnumMap;
+import java.util.List;
+import java.util.Map;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.protocol.exceptions.StateMachineException;
+
+/**
+ * Mock SCMHAManager implementation for testing.
+ */
+public final class MockSCMHAManager implements SCMHAManager {
+
+ private final SCMRatisServer ratisServer;
+ private boolean isLeader;
+
+ public static SCMHAManager getInstance() {
+ return new MockSCMHAManager();
+ }
+
+ public static SCMHAManager getLeaderInstance() {
+ MockSCMHAManager mockSCMHAManager = new MockSCMHAManager();
+ mockSCMHAManager.setIsLeader(true);
+ return mockSCMHAManager;
+ }
+
+ public static SCMHAManager getFollowerInstance() {
+ MockSCMHAManager mockSCMHAManager = new MockSCMHAManager();
+ mockSCMHAManager.setIsLeader(false);
+ return mockSCMHAManager;
+ }
+
+ /**
+ * Creates MockSCMHAManager instance.
+ */
+ private MockSCMHAManager() {
+ this.ratisServer = new MockRatisServer();
+ this.isLeader = true;
+ }
+
+ @Override
+ public void start() throws IOException {
+ ratisServer.start();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public boolean isLeader() {
+ return isLeader;
+ }
+
+ public void setIsLeader(boolean isLeader) {
+ this.isLeader = isLeader;
+ }
+
+ @Override
+ public RaftPeer getSuggestedLeader() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public SCMRatisServer getRatisServer() {
+ return ratisServer;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void shutdown() throws IOException {
+ ratisServer.stop();
+ }
+
+ @Override
+ public List<String> getRatisRoles() {
+ return Arrays.asList(
+ "180.3.14.5:9865",
+ "180.3.14.21:9865",
+ "180.3.14.145:9865");
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public NotLeaderException triggerNotLeaderException() {
+ return new NotLeaderException(RaftGroupMemberId.valueOf(
+ RaftPeerId.valueOf("peer"), RaftGroupId.randomId()),
+ null, new ArrayList<>());
+ }
+
+ private static class MockRatisServer implements SCMRatisServer {
+
+ private Map<RequestType, Object> handlers =
+ new EnumMap<>(RequestType.class);
+
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void registerStateMachineHandler(final RequestType handlerType,
+ final Object handler) {
+ handlers.put(handlerType, handler);
+ }
+
+ @Override
+ public SCMRatisResponse submitRequest(final SCMRatisRequest request)
+ throws IOException {
+ final RaftGroupMemberId raftId = RaftGroupMemberId.valueOf(
+ RaftPeerId.valueOf("peer"), RaftGroupId.randomId());
+ RaftClientReply reply;
+ try {
+ final Message result = process(request);
- return SCMRatisResponse.decode(new RaftClientReply(ClientId.randomId(),
- raftId, 1L, true, result, null, 1L, null));
++ reply = RaftClientReply.newBuilder()
++ .setClientId(ClientId.randomId())
++ .setServerId(raftId)
++ .setGroupId(RaftGroupId.emptyGroupId())
++ .setCallId(1L)
++ .setSuccess(true)
++ .setMessage(result)
++ .setException(null)
++ .setLogIndex(1L)
++ .build();
+ } catch (Exception ex) {
- return SCMRatisResponse.decode(new RaftClientReply(ClientId.randomId(),
- raftId, 1L, false, null,
- new StateMachineException(raftId, ex), 1L, null));
++ reply = RaftClientReply.newBuilder()
++ .setClientId(ClientId.randomId())
++ .setServerId(raftId)
++ .setGroupId(RaftGroupId.emptyGroupId())
++ .setCallId(1L)
++ .setSuccess(false)
++ .setMessage(Message.EMPTY)
++ .setException(new StateMachineException(raftId, ex))
++ .setLogIndex(1L)
++ .build();
+ }
++ return SCMRatisResponse.decode(reply);
+ }
+
+ private Message process(final SCMRatisRequest request)
+ throws Exception {
+ try {
+ final Object handler = handlers.get(request.getType());
+
+ if (handler == null) {
+ throw new IOException("No handler found for request type " +
+ request.getType());
+ }
+
+ final List<Class<?>> argumentTypes = new ArrayList<>();
+ for(Object args : request.getArguments()) {
+ argumentTypes.add(args.getClass());
+ }
+ final Object result = handler.getClass().getMethod(
+ request.getOperation(), argumentTypes.toArray(new Class<?>[0]))
+ .invoke(handler, request.getArguments());
+
+ return SCMRatisResponse.encode(result);
+ } catch (NoSuchMethodException | SecurityException ex) {
+ throw new InvalidProtocolBufferException(ex.getMessage());
+ } catch (InvocationTargetException e) {
+ final Exception targetEx = (Exception) e.getTargetException();
+ throw targetEx != null ? targetEx : e;
+ }
+ }
+
+ @Override
+ public RaftServer getServer() {
+ return null;
+ }
+
+ @Override
+ public RaftGroupId getRaftGroupId() {
+ return null;
+ }
+
+ @Override
+ public List<RaftPeer> getRaftPeers() {
+ return new ArrayList<>();
+ }
+
+ @Override
+ public void stop() {
+ }
+ }
+
+}
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMRatisResponse.java
index 05e2970,0000000..7ecbf2a
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMRatisResponse.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMRatisResponse.java
@@@ -1,73 -1,0 +1,89 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.RaftException;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test for SCMRatisResponse.
+ */
+public class TestSCMRatisResponse {
+ private RaftGroupMemberId raftId;
+
+ @Before
+ public void init() {
+ raftId = RaftGroupMemberId.valueOf(
+ RaftPeerId.valueOf("peer"), RaftGroupId.randomId());
+ }
+
+ @Test
+ public void testEncodeAndDecodeSuccess() throws Exception {
- SCMRatisResponse response = SCMRatisResponse.decode(new RaftClientReply(
- ClientId.randomId(), raftId, 1L, true, Message.EMPTY,
- null, 1L, null));
++ RaftClientReply reply = RaftClientReply.newBuilder()
++ .setClientId(ClientId.randomId())
++ .setServerId(raftId)
++ .setGroupId(RaftGroupId.emptyGroupId())
++ .setCallId(1L)
++ .setSuccess(true)
++ .setMessage(Message.EMPTY)
++ .setException(null)
++ .setLogIndex(1L)
++ .build();
++ SCMRatisResponse response = SCMRatisResponse.decode(reply);
+ Assert.assertTrue(response.isSuccess());
+ Assert.assertEquals(Message.EMPTY,
+ SCMRatisResponse.encode(response.getResult()));
+ }
+
+ @Test
+ public void testDecodeOperationFailureWithException() throws Exception {
- SCMRatisResponse response = SCMRatisResponse.decode(new RaftClientReply(
- ClientId.randomId(), raftId, 1L, false, Message.EMPTY,
- new LeaderNotReadyException(raftId), 1L, null));
++ RaftClientReply reply = RaftClientReply.newBuilder()
++ .setClientId(ClientId.randomId())
++ .setServerId(raftId)
++ .setGroupId(RaftGroupId.emptyGroupId())
++ .setCallId(1L)
++ .setSuccess(false)
++ .setMessage(Message.EMPTY)
++ .setException(new LeaderNotReadyException(raftId))
++ .setLogIndex(1L)
++ .build();
++ SCMRatisResponse response = SCMRatisResponse.decode(reply);
+ Assert.assertFalse(response.isSuccess());
+ Assert.assertTrue(response.getException() instanceof RaftException);
+ Assert.assertNull(response.getResult());
+ }
+
+ @Test(expected = InvalidProtocolBufferException.class)
+ public void testEncodeFailureWithNonProto() throws Exception {
+ // Non proto input
+ Message message = Message.valueOf("test");
+ // Should fail with exception.
+ SCMRatisResponse.encode(message);
+ }
+}
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index f35318d,e770ba9..8426084
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@@ -145,17 -136,8 +147,18 @@@ public class TestHealthyPipelineSafeMod
Pipeline pipeline3 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
+ pipelineManager.openPipeline(pipeline3.getId());
+ // Mark pipeline healthy
+ pipeline1 = pipelineManager.getPipeline(pipeline1.getId());
+ MockRatisPipelineProvider.markPipelineHealthy(pipeline1);
+
+ pipeline2 = pipelineManager.getPipeline(pipeline2.getId());
+ MockRatisPipelineProvider.markPipelineHealthy(pipeline2);
+
+ pipeline3 = pipelineManager.getPipeline(pipeline3.getId());
+ MockRatisPipelineProvider.markPipelineHealthy(pipeline3);
+
SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
config, containers, pipelineManager, eventQueue);
@@@ -236,16 -215,8 +241,17 @@@
Pipeline pipeline3 =
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
+ pipelineManager.openPipeline(pipeline3.getId());
+ // Mark pipeline healthy
+ pipeline1 = pipelineManager.getPipeline(pipeline1.getId());
+ MockRatisPipelineProvider.markPipelineHealthy(pipeline1);
+
+ pipeline2 = pipelineManager.getPipeline(pipeline2.getId());
+ MockRatisPipelineProvider.markPipelineHealthy(pipeline2);
+
+ pipeline3 = pipelineManager.getPipeline(pipeline3.getId());
+ MockRatisPipelineProvider.markPipelineHealthy(pipeline3);
SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
config, containers, pipelineManager, eventQueue);
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index 5aa67a3,6430247..6860da2
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@@ -27,13 -32,10 +32,16 @@@ import org.apache.hadoop.hdds.scm.HddsT
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
-import org.apache.hadoop.hdds.scm.pipeline.*;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
++import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
++import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
+ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.GenericTestUtils;
@@@ -51,8 -53,9 +59,9 @@@ public class TestOneReplicaPipelineSafe
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private OneReplicaPipelineSafeModeRule rule;
- private SCMPipelineManager pipelineManager;
+ private PipelineManagerV2Impl pipelineManager;
private EventQueue eventQueue;
+ private MockNodeManager mockNodeManager;
private void setup(int nodes, int pipelineFactorThreeCount,
int pipelineFactorOneCount) throws Exception {
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 7901883,7831307..f081bac
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@@ -35,15 -37,11 +37,16 @@@ import org.apache.hadoop.hdds.scm.HddsT
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
-import org.apache.hadoop.hdds.scm.pipeline.*;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
+ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
@@@ -385,15 -362,11 +388,16 @@@ public class TestSCMSafeModeManager
mockRatisProvider);
pipelineManager.allowPipelineCreation();
- for (int i=0; i < pipelineCount; i++) {
- Pipeline pipeline = pipelineManager.
- createPipeline(HddsProtos.ReplicationType.RATIS,
+ for (int i = 0; i < pipelineCount; i++) {
+ // Create pipeline
+ Pipeline pipeline = pipelineManager.createPipeline(
+ HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
+
+ pipelineManager.openPipeline(pipeline.getId());
+ // Mark pipeline healthy
+ pipeline = pipelineManager.getPipeline(pipeline.getId());
+ MockRatisPipelineProvider.markPipelineHealthy(pipeline);
}
for (ContainerInfo container : containers) {
diff --cc hadoop-ozone/fault-injection-test/mini-chaos-tests/pom.xml
index 5523150,5523150..66a44a3
--- a/hadoop-ozone/fault-injection-test/mini-chaos-tests/pom.xml
+++ b/hadoop-ozone/fault-injection-test/mini-chaos-tests/pom.xml
@@@ -50,6 -50,6 +50,12 @@@ http://maven.apache.org/xsd/maven-4.0.0
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
++ <artifactId>hadoop-hdds-server-scm</artifactId>
++ <scope>test</scope>
++ <type>test-jar</type>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdds-hadoop-dependency-test</artifactId>
<scope>test</scope>
</dependency>
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 394e43c,629ab5a..099b069
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@@ -54,8 -40,8 +40,9 @@@ import org.apache.hadoop.hdds.annotatio
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+ import org.apache.hadoop.hdds.scm.OzoneClientConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index ce696c5,135fb51..c8c05df
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@@ -17,37 -17,8 +17,40 @@@
*/
package org.apache.hadoop.ozone;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
+ .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+ .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.hdds.scm.TestUtils;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.file.Path;
+import java.nio.file.Paths;
++import java.time.Duration;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
@@@ -104,8 -76,27 +108,6 @@@ import org.mockito.Mockito
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
- import com.google.common.collect.Lists;
- import com.google.common.collect.Maps;
-import java.io.File;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.time.Duration;
-import java.util.Map;
-import java.util.List;
-import java.util.Set;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
-import static org.apache.hadoop.hdds.HddsConfigKeys.*;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
/**
* Test class that exercises the StorageContainerManager.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org