You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by ad...@apache.org on 2022/03/25 11:36:35 UTC
[ozone] 01/01: Merge remote-tracking branch 'origin/master' into HDDS-3816-ec-merge-master
This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 9ec90f5820aa9143393f5aa392ed9c1f234e9c2c
Merge: f386662 75f5501
Author: Doroszlai, Attila <ad...@apache.org>
AuthorDate: Thu Mar 24 19:19:08 2022 +0100
Merge remote-tracking branch 'origin/master' into HDDS-3816-ec-merge-master
.github/workflows/post-commit.yml | 12 +-
dev-support/annotations/pom.xml | 114 +++++
.../RequestFeatureValidatorProcessor.java | 289 ++++++++++++
.../org/apache/ozone/annotations/package-info.java | 5 +
.../services/javax.annotation.processing.Processor | 19 +-
.../hadoop/hdds/scm/storage/BlockInputStream.java | 2 +-
.../storage/DummyBlockInputStreamWithRetry.java | 2 +-
hadoop-hdds/common/pom.xml | 13 +-
...DatanodeVersions.java => ComponentVersion.java} | 24 +-
.../org/apache/hadoop/hdds/DatanodeVersion.java | 71 +++
.../org/apache/hadoop/hdds/HddsConfigKeys.java | 1 -
.../hadoop/hdds/client/RatisReplicationConfig.java | 34 +-
.../hadoop/hdds/client/ReplicationConfig.java | 4 +-
.../hdds/client/StandaloneReplicationConfig.java | 34 +-
.../hadoop/hdds/protocol/DatanodeDetails.java | 13 +-
.../org/apache/hadoop/hdds/ratis/RatisHelper.java | 66 ++-
.../RequestTypeDependentRetryPolicyCreator.java | 6 +-
.../hadoop/hdds/scm/container/ContainerInfo.java | 21 +-
.../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java | 9 +-
.../apache/hadoop/hdds/scm/net/InnerNodeImpl.java | 2 +-
.../protocol/StorageContainerLocationProtocol.java | 11 +-
.../org/apache/hadoop/ozone/ClientVersion.java | 75 +++
.../org/apache/hadoop/ozone/OzoneConfigKeys.java | 32 +-
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 4 +
.../apache/hadoop/ozone/OzoneManagerVersion.java | 72 +++
.../common/src/main/resources/ozone-default.xml | 35 +-
.../hdds/TestComponentVersionInvariants.java | 98 ++++
.../client/TestReplicationConfigValidator.java | 16 +-
.../hadoop/hdds/protocol/TestDatanodeDetails.java | 11 +-
.../hdds/scm/container/TestContainerInfo.java | 2 +-
.../hadoop/hdds/scm/pipeline/MockPipeline.java | 4 +-
.../hadoop/hdds/scm/pipeline/TestPipeline.java | 9 +-
.../ozone/container/ContainerTestHelper.java | 68 ---
.../TestDefaultUpgradeFinalizationExecutor.java | 3 +-
.../apache/hadoop/ozone/HddsDatanodeService.java | 8 +-
.../ozone/container/common/impl/ContainerData.java | 46 +-
.../ozone/container/common/impl/ContainerSet.java | 16 +-
.../common/interfaces/ContainerInspector.java | 72 +++
.../ozone/container/common/interfaces/Handler.java | 12 +-
.../common/report/ContainerReportPublisher.java | 5 +-
.../common/report/IncrementalReportSender.java | 30 ++
.../common/statemachine/DatanodeStateMachine.java | 6 +-
.../common/statemachine/StateContext.java | 131 +++---
.../CreatePipelineCommandHandler.java | 13 +-
.../states/datanode/RunningDatanodeState.java | 8 +-
.../states/endpoint/HeartbeatEndpointTask.java | 23 +-
.../server/ratis/ContainerStateMachine.java | 7 +-
.../transport/server/ratis/XceiverServerRatis.java | 18 +-
.../common/utils/ContainerInspectorUtil.java | 87 ++++
.../container/common/volume/AbstractFuture.java | 13 +-
.../container/keyvalue/KeyValueContainer.java | 76 ++-
.../container/keyvalue/KeyValueContainerData.java | 10 +-
.../KeyValueContainerMetadataInspector.java | 463 ++++++++++++++++++
.../ozone/container/keyvalue/KeyValueHandler.java | 95 ++--
.../container/keyvalue/TarContainerPacker.java | 55 ++-
.../container/keyvalue/helpers/ChunkUtils.java | 4 +
.../keyvalue/helpers/KeyValueContainerUtil.java | 23 +-
.../container/keyvalue/impl/BlockManagerImpl.java | 104 ++--
.../keyvalue/impl/ChunkManagerDispatcher.java | 18 +-
.../background/BlockDeletingService.java | 108 +++--
.../container/metadata/AbstractDatanodeStore.java | 6 +-
.../ozone/container/ozoneimpl/ContainerReader.java | 6 +-
.../ozone/container/ozoneimpl/OzoneContainer.java | 40 +-
.../container/replication/MeasuredReplicator.java | 3 +-
.../commands/RefreshVolumeUsageCommand.java | 57 +++
.../hadoop/ozone/container/common/ScmTestMock.java | 7 +-
.../container/common/TestBlockDeletingService.java | 6 +-
.../common/TestKeyValueContainerData.java | 6 +-
.../TestSchemaOneBackwardsCompatibility.java | 2 +-
.../common/helpers/TestDatanodeVersionFile.java | 4 +-
.../impl/TestContainerDeletionChoosingPolicy.java | 2 +-
.../common/impl/TestContainerPersistence.java | 47 --
.../container/common/impl/TestHddsDispatcher.java | 7 +-
.../common/statemachine/TestStateContext.java | 71 ++-
.../TestCreatePipelineCommandHandler.java | 36 +-
.../common/volume/TestStorageVolumeChecker.java | 12 +-
.../keyvalue/TestKeyValueBlockIterator.java | 10 +-
.../container/keyvalue/TestKeyValueContainer.java | 67 ++-
.../keyvalue/TestKeyValueContainerCheck.java | 158 +------
...a => TestKeyValueContainerIntegrityChecks.java} | 160 ++-----
.../TestKeyValueContainerMetadataInspector.java | 360 ++++++++++++++
.../container/keyvalue/TestKeyValueHandler.java | 19 +-
.../TestKeyValueHandlerWithUnhealthyContainer.java | 14 -
.../keyvalue/impl/CommonChunkManagerTestCases.java | 1 -
.../keyvalue/impl/TestBlockManagerImpl.java | 57 +--
.../container/ozoneimpl/TestContainerReader.java | 2 +-
.../container/ozoneimpl/TestOzoneContainer.java | 6 +-
.../replication/TestMeasuredReplicator.java | 15 +
.../testutils/BlockDeletingServiceTestImpl.java | 2 +-
hadoop-hdds/dev-support/checkstyle/checkstyle.xml | 1 +
hadoop-hdds/docs/content/concept/Containers.md | 3 +-
hadoop-hdds/docs/content/concept/Datanodes.md | 4 +-
hadoop-hdds/docs/content/concept/OzoneManager.md | 6 +-
hadoop-hdds/docs/content/concept/Recon.md | 9 +-
hadoop-hdds/docs/content/feature/OM-HA.md | 4 +-
hadoop-hdds/docs/content/feature/PrefixFSO.md | 68 ++-
hadoop-hdds/docs/content/feature/SCM-HA.md | 2 +-
hadoop-hdds/docs/content/interface/O3fs.md | 2 +-
hadoop-hdds/docs/content/interface/O3fs.zh.md | 4 +-
hadoop-hdds/docs/content/interface/Ofs.md | 2 +-
hadoop-hdds/docs/content/tools/TestTools.md | 4 +-
hadoop-hdds/docs/content/tools/TestTools.zh.md | 4 +-
hadoop-hdds/docs/dev-support/bin/generate-site.sh | 12 +-
.../docs/dev-support/bin/make_images_responsive.py | 57 +++
.../themes/ozonedoc/layouts/shortcodes/image.html | 2 +-
hadoop-hdds/framework/pom.xml | 8 +
.../hadoop/hdds/protocol/SCMSecurityProtocol.java | 12 +
.../SCMSecurityProtocolClientSideTranslatorPB.java | 37 ++
...lockLocationProtocolClientSideTranslatorPB.java | 4 +-
...inerLocationProtocolClientSideTranslatorPB.java | 9 +-
.../scm/update/client/UpdateServiceConfig.java | 5 +-
.../authority/PKIProfiles/DefaultProfile.java | 32 +-
...ateClient.java => CommonCertificateClient.java} | 116 ++---
.../certificate/client/OMCertificateClient.java | 79 +---
.../client/ReconCertificateClient.java} | 38 +-
...va => FixedThreadPoolWithAffinityExecutor.java} | 70 ++-
.../hadoop/hdds/utils/db/DBUpdatesWrapper.java | 9 +
.../org/apache/hadoop/hdds/utils/db/RDBStore.java | 1 +
.../hadoop/hdds/utils/db/cache/EpochEntry.java | 75 ---
.../hadoop/hdds/utils/db/cache/FullTableCache.java | 50 +-
.../hdds/utils/db/cache/PartialTableCache.java | 56 +--
.../hadoop/hdds/utils/db/cache/TableCache.java | 4 +-
.../client/TestDefaultCertificateClient.java | 2 +-
.../hadoop/hdds/server/events/TestEventQueue.java | 35 +-
.../hadoop/hdds/server/http/TestHtmlQuoting.java | 5 +-
.../hadoop/hdds/utils/db/cache/TestTableCache.java | 77 ++-
hadoop-hdds/interface-client/pom.xml | 5 +
.../src/main/proto/DatanodeClientProtocol.proto | 4 +-
.../interface-client/src/main/proto/hdds.proto | 7 +
.../proto/ScmServerDatanodeHeartbeatProtocol.proto | 11 +-
.../src/main/proto/ScmServerSecurityProtocol.proto | 8 +-
.../hadoop/hdds/scm/SCMCommonPlacementPolicy.java | 2 +-
.../container/AbstractContainerReportHandler.java | 43 +-
.../hdds/scm/container/ContainerManager.java | 7 +
.../hdds/scm/container/ContainerManagerImpl.java | 78 +--
.../hdds/scm/container/ContainerReportHandler.java | 141 ++++--
.../hdds/scm/container/ContainerStateManager.java | 10 +-
.../scm/container/ContainerStateManagerImpl.java | 24 +-
.../IncrementalContainerReportHandler.java | 18 +-
.../hdds/scm/container/ReplicationManager.java | 4 +-
.../scm/container/balancer/ContainerBalancer.java | 133 ++++--
.../balancer/ContainerBalancerConfiguration.java | 23 +-
.../balancer/ContainerBalancerMetrics.java | 139 +++---
.../scm/container/states/ContainerStateMap.java | 27 +-
.../apache/hadoop/hdds/scm/events/SCMEvents.java | 2 +-
.../apache/hadoop/hdds/scm/ha/HASecurityUtils.java | 37 +-
.../hadoop/hdds/scm/ha/InterSCMGrpcClient.java | 4 +-
.../org/apache/hadoop/hdds/scm/ha/RatisUtil.java | 8 +-
.../hdds/scm/ha/SCMDBCheckpointProvider.java | 2 +-
...ffer.java => SCMHADBTransactionBufferStub.java} | 8 +-
...MockSCMHAManager.java => SCMHAManagerStub.java} | 24 +-
.../hadoop/hdds/scm/ha/SCMRatisServerImpl.java | 4 +-
.../hadoop/hdds/scm/ha/SCMServiceManager.java | 3 +-
.../hdds/scm/metadata/MoveDataNodePairCodec.java | 6 +-
.../hadoop/hdds/scm/metadata/PipelineCodec.java | 6 +-
.../hdds/scm/node/DatanodeAdminMonitorImpl.java | 9 +-
.../hadoop/hdds/scm/node/DatanodeUsageInfo.java | 15 +-
.../apache/hadoop/hdds/scm/node/NodeManager.java | 18 +
.../hadoop/hdds/scm/node/NodeStateManager.java | 18 +-
.../hadoop/hdds/scm/node/SCMNodeManager.java | 34 +-
.../hdds/scm/node/states/Node2ObjectsMap.java | 2 +-
.../hadoop/hdds/scm/node/states/NodeStateMap.java | 4 +-
.../hdds/scm/pipeline/PipelineManagerImpl.java | 4 +-
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 54 +--
.../hadoop/hdds/scm/pipeline/PipelineStateMap.java | 11 +-
.../hdds/scm/pipeline/RatisPipelineProvider.java | 6 +-
.../hdds/scm/pipeline/RatisPipelineUtils.java | 3 +-
.../SCMSecurityProtocolServerSideTranslatorPB.java | 28 ++
...inerLocationProtocolServerSideTranslatorPB.java | 9 +-
.../scm/safemode/HealthyPipelineSafeModeRule.java | 2 +-
.../safemode/OneReplicaPipelineSafeModeRule.java | 2 +-
.../hdds/scm/server/SCMClientProtocolServer.java | 12 +-
.../scm/server/SCMDatanodeHeartbeatDispatcher.java | 20 +
.../hdds/scm/server/SCMDatanodeProtocolServer.java | 12 +-
.../hadoop/hdds/scm/server/SCMPolicyProvider.java | 5 +-
.../hdds/scm/server/SCMSecurityProtocolServer.java | 16 +-
.../hadoop/hdds/scm/server/SCMStorageConfig.java | 2 +-
.../hdds/scm/server/StorageContainerManager.java | 51 +-
.../org/apache/hadoop/hdds/scm/HddsTestUtils.java | 14 +-
.../hadoop/hdds/scm/TestHddsServerUtils.java | 2 +-
.../hadoop/hdds/scm/block/TestBlockManager.java | 7 +-
.../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 8 +-
.../hadoop/hdds/scm/container/MockNodeManager.java | 21 +
.../hdds/scm/container/SimpleMockNodeManager.java | 20 +-
.../container/TestCloseContainerEventHandler.java | 8 +-
.../scm/container/TestContainerManagerImpl.java | 26 +-
.../scm/container/TestContainerReportHandler.java | 34 +-
.../scm/container/TestContainerStateManager.java | 14 +-
.../TestIncrementalContainerReportHandler.java | 30 +-
.../hdds/scm/container/TestReplicationManager.java | 140 +++---
.../scm/container/TestUnknownContainerReport.java | 4 +-
.../container/balancer/TestContainerBalancer.java | 84 ++--
.../states/TestContainerReplicaCount.java | 5 +-
.../hdds/scm/ha/TestReplicationAnnotation.java | 10 +-
.../hdds/scm/ha/TestSequenceIDGenerator.java | 4 +-
.../hdds/scm/node/TestContainerPlacement.java | 6 +-
.../hdds/scm/node/TestDatanodeAdminMonitor.java | 7 +-
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 3 +-
.../hadoop/hdds/scm/node/TestNodeStateManager.java | 24 +
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 12 +-
.../hdds/scm/node/TestSCMNodeStorageStatMap.java | 5 +-
.../hdds/scm/pipeline/MockPipelineManager.java | 4 +-
.../TestPipelineDatanodesIntersection.java | 13 +-
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 129 ++---
.../scm/pipeline/TestPipelinePlacementPolicy.java | 158 ++++++-
.../scm/pipeline/TestPipelineStateManagerImpl.java | 62 +--
.../hdds/scm/pipeline/TestPipelineStateMap.java | 12 +-
.../scm/pipeline/TestRatisPipelineProvider.java | 57 ++-
.../scm/pipeline/TestSimplePipelineProvider.java | 18 +-
.../pipeline/TestWritableECContainerProvider.java | 16 +-
.../safemode/TestHealthyPipelineSafeModeRule.java | 20 +-
.../TestOneReplicaPipelineSafeModeRule.java | 10 +-
.../hdds/scm/safemode/TestSCMSafeModeManager.java | 22 +-
.../scm/server/TestSCMBlockProtocolServer.java | 10 +-
.../server/TestSCMDatanodeHeartbeatDispatcher.java | 54 +--
.../server/TestStorageContainerManagerStarter.java | 6 +-
.../testutils/ReplicationNodeManagerMock.java | 14 +
hadoop-hdds/test-utils/pom.xml | 5 +
.../org/apache/ozone/test/GenericTestUtils.java | 18 +-
.../java/org/apache/ozone/test/tag/Flaky.java} | 36 +-
.../main/java/org/apache/ozone/test/tag/Slow.java} | 35 +-
.../org/apache/ozone/test/tag/package-info.java} | 23 +-
.../scm/cli/ContainerBalancerStartSubcommand.java | 8 +-
.../hdds/scm/cli/ContainerOperationClient.java | 9 +-
.../hadoop/hdds/scm/cli/cert/CertCommands.java | 4 +-
.../hdds/scm/cli/datanode/ListInfoSubcommand.java | 38 +-
.../hdds/scm/cli/container/TestInfoSubCommand.java | 6 +-
.../datanode/TestContainerBalancerSubCommand.java | 6 +-
.../cli/pipeline/TestListPipelinesSubCommand.java | 12 +-
.../hadoop/ozone/client/OzoneClientFactory.java | 5 +-
.../client/checksum/BaseFileChecksumHelper.java | 54 ++-
.../checksum/ReplicatedFileChecksumHelper.java | 44 +-
.../client/io/BlockOutputStreamEntryPool.java | 1 -
.../hadoop/ozone/client/io/ECBlockInputStream.java | 7 +-
.../client/io/MultipartCryptoKeyInputStream.java | 4 +
.../ozone/client/protocol/ClientProtocol.java | 4 +-
.../apache/hadoop/ozone/client/rpc/RpcClient.java | 46 +-
.../checksum/TestReplicatedFileChecksumHelper.java | 39 +-
.../hadoop/ozone/client/rpc/RpcClientTest.java | 279 +++++------
.../hadoop/ozone/client/rpc/TestOzoneKMSUtil.java | 3 +-
.../org/apache/hadoop/ozone/audit/OMAction.java | 4 +
.../org/apache/hadoop/ozone/om/OMConfigKeys.java | 15 +
.../ozone/om/ha/OMFailoverProxyProvider.java | 130 ++---
.../apache/hadoop/ozone/om/helpers/DBUpdates.java | 10 +
.../hadoop/ozone/om/helpers/OmDirectoryInfo.java | 7 +-
.../apache/hadoop/ozone/om/helpers/OmKeyInfo.java | 16 +-
.../ozone/om/helpers/OmKeyLocationInfoGroup.java | 3 +-
.../hadoop/ozone/om/helpers/OmVolumeArgs.java | 4 +-
.../hadoop/ozone/om/helpers/ServiceInfo.java | 25 +-
.../ozone/om/protocolPB/Hadoop3OmTransport.java | 3 +-
...OzoneManagerProtocolClientSideTranslatorPB.java | 21 +-
.../apache/hadoop/ozone/protocolPB/OMPBHelper.java | 37 +-
.../apache/hadoop/ozone/security/acl/OzoneObj.java | 3 +-
.../apache/hadoop/ozone/web/utils/OzoneUtils.java | 18 +-
.../org/apache/hadoop/ozone/TestOzoneAcls.java | 10 +-
.../ozone/om/ha/TestOMFailoverProxyProvider.java | 8 +-
.../hadoop/ozone/om/helpers/TestOmBucketInfo.java | 18 +-
.../hadoop/ozone/om/helpers/TestOmKeyInfo.java | 24 +-
.../ozone/om/helpers/TestOmMultipartKeyInfo.java | 5 +-
.../hadoop/ozone/om/helpers/TestQuotaUtil.java | 5 +-
.../ozone/security/acl/TestOzoneObjInfo.java | 5 +-
hadoop-ozone/csi/pom.xml | 4 +
hadoop-ozone/datanode/pom.xml | 1 +
hadoop-ozone/dev-support/checks/_lib.sh | 2 +-
.../dev-support/checks/_mvn_unit_report.sh | 24 +-
hadoop-ozone/dev-support/checks/acceptance.sh | 3 +-
hadoop-ozone/dev-support/checks/integration.sh | 2 +-
hadoop-ozone/dev-support/checks/rat.sh | 4 +-
hadoop-ozone/dist/pom.xml | 2 +-
.../dist/src/main/compose/ozone-topology/test.sh | 6 -
.../dist/src/main/compose/ozone/docker-config | 4 +
.../dist/src/main/compose/ozone/prometheus.yml | 1 +
hadoop-ozone/dist/src/main/compose/ozone/test.sh | 4 +
.../src/main/compose/ozonesecure/docker-config | 4 +
.../dist/src/main/compose/ozonesecure/test.sh | 2 +-
hadoop-ozone/dist/src/main/compose/testlib.sh | 98 +++-
.../dist/src/main/compose/upgrade/testlib.sh | 9 +-
.../non-rolling-upgrade/1.1.0-1.2.0/callback.sh | 6 +
.../dist/src/main/compose/xcompat/clients.yaml | 18 +
hadoop-ozone/dist/src/main/compose/xcompat/test.sh | 17 +-
hadoop-ozone/dist/src/main/k8s/README.md | 68 +++
.../k8s/definitions/ozone-csi/csi-controller.yaml | 2 +-
.../main/k8s/examples/getting-started/Flekszible | 2 +
.../examples/getting-started/config-configmap.yaml | 1 +
.../kustomization.yaml} | 30 +-
.../dist/src/main/k8s/examples/minikube/Flekszible | 2 +
.../k8s/examples/minikube/config-configmap.yaml | 1 +
.../Flekszible => minikube/kustomization.yaml} | 30 +-
.../src/main/k8s/examples/ozone-dev/Flekszible | 1 +
.../k8s/examples/ozone-dev/config-configmap.yaml | 1 +
.../examples/ozone-dev/kustomization.yaml} | 42 +-
.../dist/src/main/k8s/examples/ozone-ha/Flekszible | 3 +
.../ozone-ha/{Flekszible => kustomization.yaml} | 26 +-
.../dist/src/main/k8s/examples/ozone/Flekszible | 3 +-
.../main/k8s/examples/ozone/config-configmap.yaml | 1 +
.../Flekszible => ozone/kustomization.yaml} | 26 +-
hadoop-ozone/dist/src/main/license/jar-report.txt | 1 +
.../main/smoketest/auditparser/auditparser.robot | 22 +-
.../dist/src/main/smoketest/basic/basic.robot | 9 +-
.../debug/ozone-debug-corrupt-block.robot | 49 ++
.../ozone-debug-dead-datanode.robot} | 35 +-
.../debug/ozone-debug-stale-datanode.robot | 48 ++
.../main/smoketest/debug/ozone-debug-tests.robot | 51 ++
.../src/main/smoketest/debug/ozone-debug.robot | 93 +++-
.../dist/src/main/smoketest/freon/generate.robot | 19 +-
.../dist/src/main/smoketest/freon/remove.robot | 21 +-
.../dist/src/main/smoketest/freon/validate.robot | 13 +-
.../dist/src/main/smoketest/omha/om-prepare.robot | 3 +-
.../dist/src/main/smoketest/ozone-lib/freon.robot | 65 +++
.../dist/src/main/smoketest/recon/recon-api.robot | 6 +-
.../src/main/smoketest/s3/MultipartUpload.robot | 21 +-
.../dist/src/main/smoketest/s3/bucketlist.robot | 7 +-
.../dist/src/main/smoketest/s3/commonawslib.robot | 36 ++
.../dist/src/main/smoketest/spnego/web.robot | 4 -
.../dist/src/main/smoketest/upgrade/generate.robot | 5 +-
.../fault-injection-test/mini-chaos-tests/pom.xml | 25 +
.../hadoop/ozone/TestAllMiniChaosOzoneCluster.java | 2 +-
.../hadoop/ozone/insight/TestBaseInsightPoint.java | 7 +-
hadoop-ozone/integration-test/pom.xml | 5 +
.../ozone/TestDirectoryDeletingServiceWithFSO.java | 26 +-
.../hadoop/fs/ozone/TestOzoneFileSystem.java | 4 +-
.../fs/ozone/TestOzoneFileSystemWithLinks.java | 14 +-
.../hadoop/fs/ozone/TestRootedOzoneFileSystem.java | 2 +-
.../hadoop/hdds/scm/TestRatisPipelineLeader.java | 23 +-
.../hdds/scm/TestSCMDbCheckpointServlet.java | 7 +-
.../hadoop/hdds/scm/TestSCMInstallSnapshot.java | 19 +-
.../apache/hadoop/hdds/scm/TestSCMSnapshot.java | 5 +-
.../TestContainerStateManagerIntegration.java | 44 +-
.../metrics/TestSCMContainerManagerMetrics.java | 4 +-
.../hdds/scm/pipeline/TestLeaderChoosePolicy.java | 28 +-
.../hdds/scm/pipeline/TestNode2PipelineMap.java | 2 +-
.../hadoop/hdds/scm/pipeline/TestNodeFailure.java | 2 +-
.../hdds/scm/pipeline/TestPipelineClose.java | 29 +-
.../TestRatisPipelineCreateAndDestroy.java | 31 +-
.../hadoop/hdds/scm/pipeline/TestSCMRestart.java | 6 +-
.../safemode/TestSCMSafeModeWithPipelineRules.java | 7 +-
.../hadoop/hdds/upgrade/TestHDDSUpgrade.java | 33 +-
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 8 +-
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 20 +-
.../hadoop/ozone/MiniOzoneHAClusterImpl.java | 2 +-
.../org/apache/hadoop/ozone/RatisTestHelper.java | 5 +-
...OutputUtil.java => StandardOutputTestBase.java} | 2 +-
.../hadoop/ozone/TestContainerOperations.java | 35 +-
.../apache/hadoop/ozone/TestMiniOzoneCluster.java | 36 +-
.../hadoop/ozone/TestMiniOzoneOMHACluster.java | 8 +-
.../hadoop/ozone/TestOzoneConfigurationFields.java | 6 +-
.../hadoop/ozone/TestSecureOzoneCluster.java | 13 +-
.../hadoop/ozone/TestStorageContainerManager.java | 5 +-
.../ozone/client/TestOzoneClientFactory.java | 75 +++
.../apache/hadoop/ozone/client/rpc/TestBCSID.java | 7 +-
.../rpc/TestBlockOutputStreamWithFailures.java | 23 +-
...estBlockOutputStreamWithFailuresFlushDelay.java | 23 +-
.../rpc/TestCloseContainerHandlingByClient.java | 14 +-
.../client/rpc/TestContainerStateMachine.java | 4 +-
.../TestContainerStateMachineFailureOnRead.java | 21 +-
.../rpc/TestContainerStateMachineFailures.java | 13 +-
.../rpc/TestContainerStateMachineFlushDelay.java | 14 +-
.../client/rpc/TestDeleteWithSlowFollower.java | 10 +-
.../ozone/client/rpc/TestECKeyOutputStream.java | 8 +-
.../client/rpc/TestFailureHandlingByClient.java | 144 +++++-
.../rpc/TestFailureHandlingByClientFlushDelay.java | 8 +-
.../rpc/TestMultiBlockWritesWithDnFailures.java | 10 +-
.../client/rpc/TestOzoneAtRestEncryption.java | 28 +-
.../ozone/client/rpc/TestOzoneRpcClient.java | 18 +-
.../client/rpc/TestOzoneRpcClientAbstract.java | 41 +-
.../rpc/TestOzoneRpcClientForAclAuditLog.java | 3 +-
.../client/rpc/TestOzoneRpcClientWithRatis.java | 10 +-
.../ozone/client/rpc/TestSecureOzoneRpcClient.java | 20 +-
.../ozone/client/rpc/TestWatchForCommit.java | 12 +-
.../rpc/read/TestBlockInputStreamFactoryImpl.java | 2 +-
.../ozone/client/rpc/read/TestInputStreamBase.java | 4 +-
.../ozone/client/rpc/read/TestKeyInputStream.java | 2 +-
.../ozone/container/TestContainerReplication.java | 2 +-
.../apache/hadoop/ozone/container/TestHelper.java | 8 +-
.../commandhandler/TestBlockDeletion.java | 21 +-
.../TestCloseContainerByPipeline.java | 24 +-
.../commandhandler/TestCloseContainerHandler.java | 2 +-
.../commandhandler/TestDeleteContainerHandler.java | 2 +-
...ler.java => TestRefreshVolumeUsageHandler.java} | 121 +++--
.../transport/server/ratis/TestCSMMetrics.java | 3 +-
.../container/ozoneimpl/TestOzoneContainer.java | 154 +++---
.../ozoneimpl/TestOzoneContainerWithTLS.java | 6 +-
.../container/server/TestContainerServer.java | 23 +-
.../server/TestSecureContainerServer.java | 10 -
.../hadoop/ozone/freon/TestRandomKeyGenerator.java | 15 +-
.../hadoop/ozone/fsck/TestContainerMapper.java | 16 +-
.../hadoop/ozone/om/TestAddRemoveOzoneManager.java | 19 +-
.../apache/hadoop/ozone/om/TestBucketOwner.java | 27 +-
.../ozone/om/TestContainerReportWithKeys.java | 7 +-
.../apache/hadoop/ozone/om/TestKeyManagerImpl.java | 36 +-
.../hadoop/ozone/om/TestOMDbCheckpointServlet.java | 2 -
.../apache/hadoop/ozone/om/TestObjectStore.java | 12 +-
.../org/apache/hadoop/ozone/om/TestOmAcls.java | 82 +++-
.../hadoop/ozone/om/TestOmBlockVersioning.java | 11 +-
.../org/apache/hadoop/ozone/om/TestOmInit.java | 7 +-
.../org/apache/hadoop/ozone/om/TestOmLDBCli.java | 5 +-
.../org/apache/hadoop/ozone/om/TestOmMetrics.java | 4 +-
.../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 25 +-
.../ozone/om/TestOzoneManagerHAKeyDeletion.java | 2 +-
.../ozone/om/TestOzoneManagerHAMetadataOnly.java | 40 +-
.../hadoop/ozone/om/TestOzoneManagerHAWithACL.java | 2 +-
.../ozone/om/TestOzoneManagerHAWithData.java | 62 +--
.../ozone/om/TestOzoneManagerHAWithFailover.java | 4 +-
.../ozone/om/TestOzoneManagerListVolumes.java | 2 -
.../hadoop/ozone/om/TestOzoneManagerPrepare.java | 14 +-
.../hadoop/ozone/om/TestOzoneManagerRestart.java | 49 +-
.../apache/hadoop/ozone/om/TestScmSafeMode.java | 2 +-
.../hadoop/ozone/om/TestSecureOzoneManager.java | 9 +-
.../hadoop/ozone/recon/TestReconAsPassiveScm.java | 8 +-
.../hadoop/ozone/recon/TestReconScmSnapshot.java | 2 +-
.../apache/hadoop/ozone/recon/TestReconTasks.java | 2 +-
.../ozone/recon/TestReconWithOzoneManager.java | 7 +-
.../ozone/recon/TestReconWithOzoneManagerHA.java | 2 +-
.../hadoop/ozone/scm/TestFailoverWithSCMHA.java | 5 +-
.../TestSCMContainerPlacementPolicyMetrics.java | 18 +-
.../ozone/scm/TestSCMInstallSnapshotWithHA.java | 4 +-
.../org/apache/hadoop/ozone/scm/TestSCMMXBean.java | 4 +-
.../ozone/scm/TestStorageContainerManagerHA.java | 2 +-
.../hadoop/ozone/scm/TestXceiverClientGrpc.java | 2 +-
.../scm/node/TestDecommissionAndMaintenance.java | 27 +-
.../ozone/scm/pipeline/TestSCMPipelineMetrics.java | 2 +-
.../hadoop/ozone/shell/TestNSSummaryAdmin.java | 4 +-
hadoop-ozone/interface-client/pom.xml | 4 +
.../src/main/proto/OmClientProtocol.proto | 3 +-
.../apache/hadoop/ozone/om/OMMetadataManager.java | 9 +-
.../hadoop/ozone/om/codec/OmKeyInfoCodec.java | 6 +-
.../ozone/om/codec/RepeatedOmKeyInfoCodec.java | 6 +-
.../hadoop/ozone/om/codec/TestOmKeyInfoCodec.java | 4 +-
.../om/codec/TestOmMultipartKeyInfoCodec.java | 3 +-
.../ozone/om/codec/TestRepeatedOmKeyInfoCodec.java | 3 +-
hadoop-ozone/ozone-manager/pom.xml | 5 +
.../org/apache/hadoop/ozone/om/BucketManager.java | 22 -
.../apache/hadoop/ozone/om/BucketManagerImpl.java | 255 ----------
.../hadoop/ozone/om/DirectoryDeletingService.java | 7 +-
.../org/apache/hadoop/ozone/om/KeyManager.java | 9 +-
.../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 113 +++--
.../java/org/apache/hadoop/ozone/om/OMMetrics.java | 20 +
.../hadoop/ozone/om/OmMetadataManagerImpl.java | 29 +-
.../hadoop/ozone/om/OpenKeyCleanupService.java | 3 +-
.../org/apache/hadoop/ozone/om/OzoneManager.java | 10 +-
.../apache/hadoop/ozone/om/OzoneManagerUtils.java | 22 +
.../hadoop/ozone/om/OzonePrefixPathImpl.java | 20 +
.../hadoop/ozone/om/TrashOzoneFileSystem.java | 1 +
.../hadoop/ozone/om/codec/OMDBDefinition.java | 10 +-
.../ozone/om/ratis/OzoneManagerRatisServer.java | 15 +-
.../hadoop/ozone/om/request/OMClientRequest.java | 15 +-
.../om/request/bucket/OMBucketCreateRequest.java | 12 +-
.../om/request/bucket/OMBucketDeleteRequest.java | 9 +-
.../om/request/bucket/acl/OMBucketAclRequest.java | 4 +-
.../file/OMDirectoryCreateRequestWithFSO.java | 5 +-
.../ozone/om/request/file/OMFileRequest.java | 4 +-
.../ozone/om/request/key/OMKeyCommitRequest.java | 52 +-
.../om/request/key/OMKeyCommitRequestWithFSO.java | 24 +-
.../om/request/key/OMKeyDeleteRequestWithFSO.java | 2 +-
.../om/request/key/OMKeyRenameRequestWithFSO.java | 2 +-
.../hadoop/ozone/om/request/key/OMKeyRequest.java | 40 +-
.../om/request/key/OMOpenKeysDeleteRequest.java | 42 +-
.../ozone/om/request/key/acl/OMKeyAclRequest.java | 2 +-
.../om/request/key/acl/OMKeyAclRequestWithFSO.java | 2 +-
.../S3MultipartUploadCompleteRequest.java | 64 ++-
.../S3MultipartUploadCompleteRequestWithFSO.java | 12 +-
.../security/OMCancelDelegationTokenRequest.java | 42 +-
.../security/OMGetDelegationTokenRequest.java | 54 ++-
.../security/OMRenewDelegationTokenRequest.java | 51 +-
.../validation/RequestFeatureValidator.java | 99 ++++
.../request/validation/RequestProcessingPhase.java | 36 +-
.../om/request/validation/RequestValidations.java | 107 +++++
.../om/request/validation/ValidationCondition.java | 55 +++
.../om/request/validation/ValidationContext.java | 52 ++
.../om/request/validation/ValidatorRegistry.java | 201 ++++++++
.../ozone/om/request/validation/package-info.java | 62 +++
.../om/response/bucket/OMBucketCreateResponse.java | 3 +-
.../om/response/bucket/OMBucketDeleteResponse.java | 3 +-
.../response/file/OMFileCreateResponseWithFSO.java | 4 +-
.../om/response/key/OMAllocateBlockResponse.java | 3 +-
.../key/OMAllocateBlockResponseWithFSO.java | 3 +-
.../ozone/om/response/key/OMKeyCommitResponse.java | 4 +-
.../response/key/OMKeyCommitResponseWithFSO.java | 4 +-
.../ozone/om/response/key/OMKeyCreateResponse.java | 3 +-
.../response/key/OMKeyCreateResponseWithFSO.java | 4 +-
.../ozone/om/response/key/OMKeyDeleteResponse.java | 3 +-
.../response/key/OMKeyDeleteResponseWithFSO.java | 3 +-
.../response/key/OMKeyRenameResponseWithFSO.java | 3 +-
.../om/response/key/OMKeysDeleteResponse.java | 4 +-
.../om/response/key/OMKeysRenameResponse.java | 10 +
.../om/response/key/OMOpenKeysDeleteResponse.java | 18 +-
.../multipart/S3MultipartUploadAbortResponse.java | 9 +-
.../S3MultipartUploadCommitPartResponse.java | 9 +-
.../S3MultipartUploadCompleteResponse.java | 36 +-
.../S3MultipartUploadCompleteResponseWithFSO.java | 9 +-
...OzoneManagerProtocolServerSideTranslatorPB.java | 25 +-
.../protocolPB/OzoneManagerRequestHandler.java | 2 +
.../org/apache/hadoop/ozone/om/OmTestManagers.java | 11 +
.../ozone/om/ScmBlockLocationTestingClient.java | 2 +-
.../hadoop/ozone/om/TestBucketManagerImpl.java | 243 +++++-----
.../hadoop/ozone/om/TestKeyDeletingService.java | 2 +-
.../apache/hadoop/ozone/om/TestKeyManagerUnit.java | 14 +-
.../hadoop/ozone/om/TestOmMetadataManager.java | 50 +-
.../hadoop/ozone/om/TestOzoneConfigUtil.java | 2 +-
.../hadoop/ozone/om/TestOzoneManagerStarter.java | 4 +-
.../apache/hadoop/ozone/om/TestTrashService.java | 4 +-
.../ozone/om/request/OMRequestTestUtils.java | 16 +
.../bucket/TestOMBucketCreateRequestWithFSO.java | 4 +
.../bucket/TestOMBucketDeleteRequestWithFSO.java | 76 +++
.../request/key/TestOMKeyDeleteRequestWithFSO.java | 97 +++-
.../ozone/om/request/key/TestOMKeyRequest.java | 2 +-
.../request/key/TestOMOpenKeysDeleteRequest.java | 174 ++++---
.../TestS3MultipartUploadCompleteRequest.java | 32 +-
.../security/TestOMGetDelegationTokenRequest.java | 10 +-
.../TestRequestFeatureValidatorProcessor.java | 524 +++++++++++++++++++++
.../request/validation/TestRequestValidations.java | 349 ++++++++++++++
.../request/validation/TestValidatorRegistry.java | 215 +++++++++
.../GeneralValidatorsForTesting.java | 190 ++++++++
.../ValidatorsForOnlyOldClientValidations.java | 43 ++
.../om/response/key/TestOMKeyDeleteResponse.java | 3 +-
.../response/key/TestOMOpenKeysDeleteResponse.java | 61 ++-
.../s3/multipart/TestS3MultipartResponse.java | 19 +-
...stS3MultipartUploadCompleteResponseWithFSO.java | 17 +-
.../ozone/security/TestAWSV4AuthValidator.java | 2 +-
.../security/acl/TestOzoneNativeAuthorizer.java | 8 +-
.../hadoop/ozone/security/acl/TestParentAcl.java | 2 +-
.../hadoop/ozone/security/acl/TestVolumeOwner.java | 2 +-
.../apache/hadoop/fs/ozone/OzoneClientUtils.java | 2 +-
.../hadoop/fs/ozone/TestOzoneClientUtils.java | 4 +-
hadoop-ozone/ozonefs-hadoop2/pom.xml | 1 +
.../hadoop/fs/ozone/Hadoop27RpcTransport.java | 3 +-
hadoop-ozone/ozonefs-hadoop3/pom.xml | 1 +
hadoop-ozone/ozonefs-shaded/pom.xml | 5 +
hadoop-ozone/recon-codegen/pom.xml | 4 +
.../hadoop/ozone/recon/ReconControllerModule.java | 2 +
.../org/apache/hadoop/ozone/recon/ReconServer.java | 125 +++++
.../org/apache/hadoop/ozone/recon/ReconUtils.java | 29 ++
.../ozone/recon/api/ClusterStateEndpoint.java | 15 +-
.../recon/metrics/OzoneManagerSyncMetrics.java | 12 +
.../recon/metrics/ReconTaskStatusMetrics.java | 83 ++++
.../hadoop/ozone/recon/scm/ReconNodeManager.java | 18 +-
.../ozone/recon/scm/ReconPipelineManager.java | 4 +-
.../hadoop/ozone/recon/scm/ReconStorageConfig.java | 61 ++-
.../scm/ReconStorageContainerManagerFacade.java | 11 +-
.../spi/impl/OzoneManagerServiceProviderImpl.java | 14 +-
.../impl/StorageContainerServiceProviderImpl.java | 20 +-
.../ozone/recon/OMMetadataManagerTestUtils.java | 12 +-
.../ozone/recon/api/TestContainerEndpoint.java | 2 +-
.../hadoop/ozone/recon/api/TestEndpoints.java | 7 +-
.../ozone/recon/api/TestOpenContainerCount.java | 16 +-
.../recon/fsck/TestContainerHealthStatus.java | 4 +-
.../ozone/recon/fsck/TestContainerHealthTask.java | 6 +-
.../TestContainerHealthTaskRecordGenerator.java | 3 +-
.../recovery/TestReconOmMetadataManagerImpl.java | 4 +-
.../scm/AbstractReconContainerManagerTest.java | 20 +-
.../ozone/recon/scm/TestReconContainerManager.java | 2 +-
.../ozone/recon/scm/TestReconNodeManager.java | 12 +-
.../ozone/recon/scm/TestReconPipelineManager.java | 15 +-
.../impl/TestReconNamespaceSummaryManagerImpl.java | 6 +-
.../recon/tasks/TestContainerKeyMapperTask.java | 4 +-
.../ozone/recon/tasks/TestNSSummaryTask.java | 4 +-
.../ozone/recon/tasks/TestOMDBUpdatesHandler.java | 4 +-
.../java/org/apache/hadoop/ozone/s3/Gateway.java | 9 +-
.../hadoop/ozone/s3/OzoneClientProducer.java | 24 +-
.../hadoop/ozone/s3/endpoint/BucketEndpoint.java | 13 +
.../hadoop/ozone/s3/endpoint/EndpointBase.java | 12 +-
.../ozone/s3/endpoint/ListBucketResponse.java | 6 +-
.../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 34 +-
.../hadoop/ozone/s3/endpoint/RootEndpoint.java | 9 +-
.../hadoop/ozone/s3/exception/S3ErrorTable.java | 9 +
.../hadoop/ozone/s3/metrics/S3GatewayMetrics.java | 320 +++++++++++++
.../hadoop/ozone/s3/metrics/package-info.java | 23 +-
.../hadoop/ozone/client/OzoneBucketStub.java | 2 +-
.../ozone/s3/metrics/TestS3GatewayMetrics.java | 113 +++++
.../apache/hadoop/ozone/s3/util/TestS3Utils.java | 4 +-
.../apache/hadoop/ozone/debug/ChunkKeyHandler.java | 3 +-
.../apache/hadoop/ozone/debug/ReadReplicas.java | 5 +-
.../ozone/freon/LeaderAppendLogEntryGenerator.java | 2 +-
.../apache/hadoop/ozone/freon/OmKeyGenerator.java | 2 +-
.../hadoop/ozone/freon/SCMThroughputBenchmark.java | 2 +-
.../freon/containergenerator/GeneratorOm.java | 5 +-
.../freon/containergenerator/GeneratorScm.java | 4 +-
.../apache/hadoop/ozone/fsck/ContainerMapper.java | 5 +-
.../apache/hadoop/ozone/freon/TestProgressBar.java | 6 +-
pom.xml | 85 +++-
580 files changed, 11454 insertions(+), 4601 deletions(-)
diff --cc hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
index 2e49f02,a6f22d4..07a444a
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockInputStream.java
@@@ -212,10 -219,9 +212,10 @@@ public class BlockInputStream extends B
protected List<ChunkInfo> getChunkInfos() throws IOException {
// irrespective of the container state, we will always read via Standalone
// protocol.
- if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE) {
+ if (pipeline.getType() != HddsProtos.ReplicationType.STAND_ALONE && pipeline
+ .getType() != HddsProtos.ReplicationType.EC) {
pipeline = Pipeline.newBuilder(pipeline)
- .setReplicationConfig(new StandaloneReplicationConfig(
+ .setReplicationConfig(StandaloneReplicationConfig.getInstance(
ReplicationConfig
.getLegacyFactor(pipeline.getReplicationConfig())))
.build();
diff --cc hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
index 1e2264b,0071924..e99b6a5
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerInfo.java
@@@ -25,7 -25,7 +25,8 @@@ import java.time.Instant
import java.util.Arrays;
import java.util.Comparator;
+ import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
diff --cc hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 51c0446,fa87ab2..4b7fda9
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@@ -466,19 -449,12 +454,19 @@@ public final class OzoneConfigKeys
public static final String OZONE_CLIENT_TEST_OFS_BUCKET_LAYOUT_DEFAULT =
"FILE_SYSTEM_OPTIMIZED";
- public static final String OZONE_OM_CLIENT_PROTOCOL_VERSION_KEY =
- "ozone.om.client.protocol.version";
- // The version of the protocol for Client (S3G/OFS) to OM Communication.
- // The protocol starts at 2.0.0 and a null or empty value for older versions.
- public static final String OZONE_OM_CLIENT_PROTOCOL_VERSION = "2.0.0";
+ public static final String OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY =
+ "ozone.client.required.om.version.min";
+
+ public static final String OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT =
+ OzoneManagerVersion.S3G_PERSISTENT_CONNECTIONS.name();
+ public static final String
+ OZONE_CLIENT_BUCKET_REPLICATION_CONFIG_REFRESH_PERIOD_MS =
+ "ozone.client.bucket.replication.config.refresh.time.ms";
+ public static final long
+ OZONE_CLIENT_BUCKET_REPLICATION_CONFIG_REFRESH_PERIOD_DEFAULT_MS =
+ 300 * 1000;
+
/**
* There is no need to instantiate this class.
*/
diff --cc hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
index 4d874da,0000000..9ec19bf
mode 100644,000000..100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerInfo.java
@@@ -1,86 -1,0 +1,86 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
+
+/**
+ * Tests for the ContainerInfo class.
+ */
+
+public class TestContainerInfo {
+
+ @Test
+ public void getProtobufMessageEC() throws IOException {
+ ContainerInfo container =
- createContainerInfo(new RatisReplicationConfig(THREE));
++ createContainerInfo(RatisReplicationConfig.getInstance(THREE));
+ HddsProtos.ContainerInfoProto proto = container.getProtobuf();
+
+ // No EC Config
+ Assert.assertFalse(proto.hasEcReplicationConfig());
+ Assert.assertEquals(THREE, proto.getReplicationFactor());
+ Assert.assertEquals(RATIS, proto.getReplicationType());
+
+ // Reconstruct object from Proto
+ ContainerInfo recovered = ContainerInfo.fromProtobuf(proto);
+ Assert.assertEquals(RATIS, recovered.getReplicationType());
+ Assert.assertTrue(
+ recovered.getReplicationConfig() instanceof RatisReplicationConfig);
+
+ // EC Config
+ container = createContainerInfo(new ECReplicationConfig(3, 2));
+ proto = container.getProtobuf();
+
+ Assert.assertEquals(3, proto.getEcReplicationConfig().getData());
+ Assert.assertEquals(2, proto.getEcReplicationConfig().getParity());
+ Assert.assertFalse(proto.hasReplicationFactor());
+ Assert.assertEquals(EC, proto.getReplicationType());
+
+ // Reconstruct object from Proto
+ recovered = ContainerInfo.fromProtobuf(proto);
+ Assert.assertEquals(EC, recovered.getReplicationType());
+ Assert.assertTrue(
+ recovered.getReplicationConfig() instanceof ECReplicationConfig);
+ ECReplicationConfig config =
+ (ECReplicationConfig)recovered.getReplicationConfig();
+ Assert.assertEquals(3, config.getData());
+ Assert.assertEquals(2, config.getParity());
+ }
+
+ private ContainerInfo createContainerInfo(ReplicationConfig repConfig) {
+ ContainerInfo.Builder builder = new ContainerInfo.Builder();
+ builder.setContainerID(1234)
+ .setReplicationConfig(repConfig)
+ .setPipelineID(PipelineID.randomId())
+ .setState(HddsProtos.LifeCycleState.OPEN)
+ .setOwner("scm");
+ return builder.build();
+ }
+}
diff --cc hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
index 72e9b92,ba131ff..fdf44ed
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java
@@@ -681,10 -751,9 +751,10 @@@ public class KeyValueContainer implemen
.setWriteCount(containerData.getWriteCount())
.setReadBytes(containerData.getReadBytes())
.setWriteBytes(containerData.getWriteBytes())
- .setKeyCount(containerData.getKeyCount())
+ .setKeyCount(containerData.getBlockCount())
.setUsed(containerData.getBytesUsed())
.setState(getHddsState())
+ .setReplicaIndex(containerData.getReplicaIndex())
.setDeleteTransactionId(containerData.getDeleteTransactionId())
.setBlockCommitSequenceId(containerData.getBlockCommitSequenceId())
.setOriginNodeId(containerData.getOriginNodeId());
diff --cc hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index 0ba9f3b,e1c19f7..a14be22
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@@ -115,9 -113,6 +116,8 @@@ import java.util.Map
import java.util.Optional;
import java.util.function.Consumer;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
- import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
+
/**
* This class is the client-side translator to translate the requests made on
* the {@link StorageContainerLocationProtocol} interface to the RPC server
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index b46bc79,9b94e20..dd9f71a
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@@ -110,11 -129,10 +130,9 @@@ public class AbstractContainerReportHan
* @throws ContainerNotFoundException If the container is not present
*/
private void updateContainerStats(final DatanodeDetails datanodeDetails,
- final ContainerID containerId,
+ final ContainerInfo containerInfo,
final ContainerReplicaProto replicaProto)
throws ContainerNotFoundException {
- final ContainerInfo containerInfo = containerManager
- .getContainer(containerId);
- final ContainerID containerId = containerInfo.containerID();
if (isHealthy(replicaProto::getState)) {
if (containerInfo.getSequenceId() <
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
index e62a8b4,1c39efd..e4076d9
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java
@@@ -255,19 -270,13 +272,19 @@@ public class ContainerManagerImpl imple
.setOwner(owner)
.setContainerID(containerID.getId())
.setDeleteTransactionId(0)
- .setReplicationFactor(
- ReplicationConfig.getLegacyFactor(pipeline.getReplicationConfig()))
- .setReplicationType(pipeline.getType())
- .build();
- containerStateManager.addContainer(containerInfo);
+ .setReplicationType(pipeline.getType());
+
+ if (pipeline.getReplicationConfig() instanceof ECReplicationConfig) {
+ containerInfoBuilder.setEcReplicationConfig(
+ ((ECReplicationConfig) pipeline.getReplicationConfig()).toProto());
+ } else {
+ containerInfoBuilder.setReplicationFactor(
+ ReplicationConfig.getLegacyFactor(pipeline.getReplicationConfig()));
+ }
+
+ containerStateManager.addContainer(containerInfoBuilder.build());
scmContainerManagerMetrics.incNumSuccessfulCreateContainers();
- return containerStateManager.getContainer(containerID.getProtobuf());
+ return containerStateManager.getContainer(containerID);
}
@Override
diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index bba6edb,2168138..19b7a51
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@@ -177,10 -167,9 +177,10 @@@ public class PipelineManagerImpl implem
acquireWriteLock();
try {
- Pipeline pipeline = pipelineFactory.create(replicationConfig);
+ Pipeline pipeline = pipelineFactory.create(replicationConfig,
+ excludedNodes, favoredNodes);
stateManager.addPipeline(pipeline.getProtobufMessage(
- ClientVersions.CURRENT_VERSION));
+ ClientVersion.CURRENT_VERSION));
recordMetricsForPipeline(pipeline);
return pipeline;
} catch (IOException ex) {
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
index 470136a,d1e97fe..ee0b637
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
@@@ -155,14 -154,19 +155,28 @@@ public class TestContainerManagerImpl
containerManager.getContainers(HddsProtos.LifeCycleState.OPEN).size());
Assert.assertEquals(2, containerManager
.getContainers(HddsProtos.LifeCycleState.CLOSING).size());
+ containerManager.updateContainerState(cidArray[1],
+ HddsProtos.LifeCycleEvent.QUASI_CLOSE);
+ containerManager.updateContainerState(cidArray[2],
+ HddsProtos.LifeCycleEvent.FINALIZE);
+ containerManager.updateContainerState(cidArray[2],
+ HddsProtos.LifeCycleEvent.CLOSE);
+ Assert.assertEquals(7, containerManager.
+ getContainerStateCount(HddsProtos.LifeCycleState.OPEN));
+ Assert.assertEquals(1, containerManager
+ .getContainerStateCount(HddsProtos.LifeCycleState.CLOSING));
+ Assert.assertEquals(1, containerManager
+ .getContainerStateCount(HddsProtos.LifeCycleState.QUASI_CLOSED));
+ Assert.assertEquals(1, containerManager
+ .getContainerStateCount(HddsProtos.LifeCycleState.CLOSED));
}
+
+ @Test
+ public void testAllocateContainersWithECReplicationConfig() throws Exception {
+ final ContainerInfo admin = containerManager
+ .allocateContainer(new ECReplicationConfig(3, 2), "admin");
+ Assert.assertEquals(1, containerManager.getContainers().size());
+ Assert.assertNotNull(containerManager.getContainer(admin.containerID()));
+ }
+
}
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 00d96dc,c82006e..523efc0
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@@ -28,13 -26,10 +28,13 @@@ import org.apache.hadoop.hdds.scm.metad
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
- import org.apache.hadoop.ozone.ClientVersions;
+ import org.apache.hadoop.ozone.ClientVersion;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 3152c39,3a68db1..fd29929
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@@ -224,19 -220,15 +226,21 @@@ public class TestPipelineManagerImpl
Assert.assertEquals(Pipeline.PipelineState.DORMANT,
pipelineStore.get(pipeline.getId()).getPipelineState());
Assert.assertFalse(pipelineManager
- .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
+ .getPipelines(RatisReplicationConfig
+ .getInstance(ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN).contains(pipeline));
- Assert.assertEquals(1, pipelineManager
- .getPipelineCount(new RatisReplicationConfig(ReplicationFactor.THREE),
++ Assert.assertEquals(1, pipelineManager.getPipelineCount(
++ RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+ Pipeline.PipelineState.DORMANT));
pipelineManager.activatePipeline(pipeline.getId());
Assert.assertTrue(pipelineManager
- .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
+ .getPipelines(RatisReplicationConfig
+ .getInstance(ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN).contains(pipeline));
- Assert.assertEquals(1, pipelineManager
- .getPipelineCount(new RatisReplicationConfig(ReplicationFactor.THREE),
++ Assert.assertEquals(1, pipelineManager.getPipelineCount(
++ RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+ Pipeline.PipelineState.OPEN));
buffer.flush();
Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
pipelineManager.close();
@@@ -715,43 -724,6 +736,43 @@@
assertTrue(containerLogIdx < pipelineLogIdx);
}
+ @Test
+ public void testCreatePipelineForRead() throws IOException {
+ PipelineManager pipelineManager = createPipelineManager(true);
+ List<DatanodeDetails> dns = nodeManager
+ .getNodes(NodeStatus.inServiceHealthy())
+ .stream()
+ .limit(3)
+ .collect(Collectors.toList());
+ Set<ContainerReplica> replicas = createContainerReplicasList(dns);
+ Pipeline pipeline = pipelineManager.createPipelineForRead(
- new RatisReplicationConfig(ReplicationFactor.THREE), replicas);
++ RatisReplicationConfig.getInstance(ReplicationFactor.THREE), replicas);
+ Assert.assertEquals(3, pipeline.getNodes().size());
+ for (DatanodeDetails dn : pipeline.getNodes()) {
+ Assert.assertTrue(dns.contains(dn));
+ }
+ }
+
+ private Set<ContainerReplica> createContainerReplicasList(
+ List <DatanodeDetails> dns) {
+ Set<ContainerReplica> replicas = new HashSet<>();
+ for (DatanodeDetails dn : dns) {
+ ContainerReplica r = ContainerReplica.newBuilder()
+ .setBytesUsed(1)
+ .setContainerID(ContainerID.valueOf(1))
+ .setContainerState(StorageContainerDatanodeProtocolProtos
+ .ContainerReplicaProto.State.CLOSED)
+ .setKeyCount(1)
+ .setOriginNodeId(UUID.randomUUID())
+ .setSequenceId(1)
+ .setReplicaIndex(0)
+ .setDatanodeDetails(dn)
+ .build();
+ replicas.add(r);
+ }
+ return replicas;
+ }
+
private void sendPipelineReport(
DatanodeDetails dn, Pipeline pipeline,
PipelineReportHandler pipelineReportHandler,
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
index 11ae86c,0000000..8168ce8
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
@@@ -1,95 -1,0 +1,99 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for PipelineStateMap.
+ */
+
+public class TestPipelineStateMap {
+
+ private PipelineStateMap map;
+
+ @Before
+ public void setup() {
+ map = new PipelineStateMap();
+ }
+
+ @After
+ public void teardown() throws IOException {
+ }
+
+ @Test
+ public void testCountPipelines() throws IOException {
+ Pipeline p;
+
+ // Open Stanadlone Pipelines
+ map.addPipeline(MockPipeline.createPipeline(1));
+ map.addPipeline(MockPipeline.createPipeline(1));
+ p = MockPipeline.createPipeline(1);
+ map.addPipeline(p);
+ map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
+
+ // Ratis pipeline
+ map.addPipeline(MockPipeline.createRatisPipeline());
+ p = MockPipeline.createRatisPipeline();
+ map.addPipeline(p);
+ map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
+
+ // EC Pipelines
+ map.addPipeline(MockPipeline.createEcPipeline(
+ new ECReplicationConfig(3, 2)));
+ map.addPipeline(MockPipeline.createEcPipeline(
+ new ECReplicationConfig(3, 2)));
+ p = MockPipeline.createEcPipeline(new ECReplicationConfig(3, 2));
+ map.addPipeline(p);
+ map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
+
- assertEquals(2, map.getPipelineCount(new StandaloneReplicationConfig(ONE),
++ assertEquals(2, map.getPipelineCount(
++ StandaloneReplicationConfig.getInstance(ONE),
+ Pipeline.PipelineState.OPEN));
- assertEquals(1, map.getPipelineCount(new RatisReplicationConfig(THREE),
++ assertEquals(1, map.getPipelineCount(
++ RatisReplicationConfig.getInstance(THREE),
+ Pipeline.PipelineState.OPEN));
+ assertEquals(2, map.getPipelineCount(new ECReplicationConfig(3, 2),
+ Pipeline.PipelineState.OPEN));
+
+ assertEquals(0, map.getPipelineCount(new ECReplicationConfig(6, 3),
+ Pipeline.PipelineState.OPEN));
+
- assertEquals(1, map.getPipelineCount(new StandaloneReplicationConfig(ONE),
++ assertEquals(1, map.getPipelineCount(
++ StandaloneReplicationConfig.getInstance(ONE),
+ Pipeline.PipelineState.CLOSED));
- assertEquals(1, map.getPipelineCount(new RatisReplicationConfig(THREE),
++ assertEquals(1, map.getPipelineCount(
++ RatisReplicationConfig.getInstance(THREE),
+ Pipeline.PipelineState.CLOSED));
+ assertEquals(1, map.getPipelineCount(new ECReplicationConfig(3, 2),
+ Pipeline.PipelineState.CLOSED));
+ }
+
+
+}
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index fa8da28,d2b19fb..c0ce8d4
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@@ -220,17 -219,15 +224,20 @@@ public class TestRatisPipelineProvider
List<DatanodeDetails> healthyNodes = nodeManager
.getNodes(NodeStatus.inServiceHealthy()).stream()
.limit(3).collect(Collectors.toList());
+ Set<ContainerReplica> replicas = createContainerReplicas(healthyNodes);
Pipeline pipeline1 = provider.create(
- new RatisReplicationConfig(ReplicationFactor.THREE), healthyNodes);
+ RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+ healthyNodes);
Pipeline pipeline2 = provider.create(
- new RatisReplicationConfig(ReplicationFactor.THREE), healthyNodes);
+ RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
+ healthyNodes);
+ Pipeline pipeline3 = provider.createForRead(
- new RatisReplicationConfig(ReplicationFactor.THREE), replicas);
++ RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
++ replicas);
Assert.assertEquals(pipeline1.getNodeSet(), pipeline2.getNodeSet());
+ Assert.assertEquals(pipeline2.getNodeSet(), pipeline3.getNodeSet());
cleanup();
}
@@@ -284,31 -281,6 +291,31 @@@
}
@Test
+ // Test excluded nodes work correctly. Note that for Ratis, the
+ // PipelinePlacementPolicy, which Ratis is hardcoded to use, does not consider
+ // favored nodes.
+ public void testCreateFactorTHREEPipelineWithExcludedDatanodes()
+ throws Exception {
+ init(1);
+ int healthyCount = nodeManager.getNodes(NodeStatus.inServiceHealthy())
+ .size();
+ // Add all but 3 nodes to the exclude list and ensure that the 3 picked
+ // nodes are not in the excluded list.
+ List<DatanodeDetails> excludedNodes = nodeManager
+ .getNodes(NodeStatus.inServiceHealthy()).stream()
+ .limit(healthyCount - 3).collect(Collectors.toList());
+
+ Pipeline pipeline1 = provider.create(
- new RatisReplicationConfig(ReplicationFactor.THREE), excludedNodes,
- Collections.EMPTY_LIST);
++ RatisReplicationConfig.getInstance(ReplicationFactor.THREE),
++ excludedNodes, Collections.EMPTY_LIST);
+
+ for (DatanodeDetails dn : pipeline1.getNodes()) {
+ assertFalse(excludedNodes.contains(dn));
+ }
+ }
+
+
+ @Test
public void testCreatePipelinesWhenNotEnoughSpace() throws Exception {
String expectedErrorSubstring = "Unable to find enough" +
" nodes that meet the space requirement";
@@@ -319,11 -291,8 +326,11 @@@
largeContainerConf.set(OZONE_SCM_CONTAINER_SIZE, "100TB");
init(1, largeContainerConf);
for (ReplicationFactor factor: ReplicationFactor.values()) {
+ if (factor == ReplicationFactor.ZERO) {
+ continue;
+ }
try {
- provider.create(new RatisReplicationConfig(factor));
+ provider.create(RatisReplicationConfig.getInstance(factor));
Assert.fail("Expected SCMException for large container size with " +
"replication factor " + factor.toString());
} catch (SCMException ex) {
@@@ -335,11 -304,8 +342,11 @@@
largeMetadataConf.set(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN, "100TB");
init(1, largeMetadataConf);
for (ReplicationFactor factor: ReplicationFactor.values()) {
+ if (factor == ReplicationFactor.ZERO) {
+ continue;
+ }
try {
- provider.create(new RatisReplicationConfig(factor));
+ provider.create(RatisReplicationConfig.getInstance(factor));
Assert.fail("Expected SCMException for large metadata size with " +
"replication factor " + factor.toString());
} catch (SCMException ex) {
diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
index b3ce216,0000000..ae997a3
mode 100644,000000..100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@@ -1,446 -1,0 +1,446 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
- import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
++import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests to validate the WritableECContainerProvider works correctly.
+ */
+public class TestWritableECContainerProvider {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestWritableECContainerProvider.class);
+ private static final String OWNER = "SCM";
+ private PipelineManager pipelineManager;
+ private ContainerManager containerManager
+ = Mockito.mock(ContainerManager.class);
+ private PipelineChoosePolicy pipelineChoosingPolicy
+ = new HealthyPipelineChoosePolicy();
+
+ private OzoneConfiguration conf;
+ private DBStore dbStore;
+ private SCMHAManager scmhaManager;
+ private NodeManager nodeManager;
+ private WritableContainerProvider provider;
+ private ReplicationConfig repConfig;
+ private int minPipelines;
+
+ private Map<ContainerID, ContainerInfo> containers;
+
+ @Before
+ public void setup() throws IOException {
+ repConfig = new ECReplicationConfig(3, 2);
+ conf = new OzoneConfiguration();
+ WritableECContainerProvider.WritableECContainerProviderConfig providerConf =
+ conf.getObject(WritableECContainerProvider
+ .WritableECContainerProviderConfig.class);
+ minPipelines = providerConf.getMinimumPipelines();
+ containers = new HashMap<>();
+ File testDir = GenericTestUtils.getTestDir(
+ TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+ conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(
+ conf, new SCMDBDefinition());
- scmhaManager = MockSCMHAManager.getInstance(true);
++ scmhaManager = SCMHAManagerStub.getInstance(true);
+ nodeManager = new MockNodeManager(true, 10);
+ pipelineManager =
+ new MockPipelineManager(dbStore, scmhaManager, nodeManager);
+ provider = new WritableECContainerProvider(
+ conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+ Mockito.doAnswer(call -> {
+ Pipeline pipeline = (Pipeline)call.getArguments()[2];
+ ContainerInfo container = createContainer(pipeline,
+ repConfig, System.nanoTime());
+ pipelineManager.addContainerToPipeline(
+ pipeline.getId(), container.containerID());
+ containers.put(container.containerID(), container);
+ return container;
+ }).when(containerManager).getMatchingContainer(Matchers.anyLong(),
+ Matchers.anyString(), Matchers.any(Pipeline.class));
+
+ Mockito.doAnswer(call ->
+ containers.get((ContainerID)call.getArguments()[0]))
+ .when(containerManager).getContainer(Matchers.any(ContainerID.class));
+
+ }
+
+ @Test
+ public void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
+ throws IOException {
+ // The first 5 calls should return a different container
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i = 0; i < minPipelines; i++) {
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertFalse(allocatedContainers.contains(container));
+ allocatedContainers.add(container);
+ }
+
+ allocatedContainers.clear();
+ for (int i = 0; i < 20; i++) {
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ allocatedContainers.add(container);
+ }
+ // Should have minPipelines containers created
+ assertEquals(minPipelines,
+ pipelineManager.getPipelines(repConfig, OPEN).size());
+ // We should have more than 1 allocatedContainers in the set proving a
+ // random container is selected each time. Do not check for 5 here as there
+ // is a reasonable chance that in 20 turns we don't pick all 5 nodes.
+ assertTrue(allocatedContainers.size() > 2);
+ }
+
+ @Test
+ public void testPiplineLimitIgnoresExcludedPipelines() throws IOException {
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i = 0; i < minPipelines; i++) {
+ ContainerInfo container = provider.getContainer(
+ 1, repConfig, OWNER, new ExcludeList());
+ allocatedContainers.add(container);
+ }
+ // We have the min limit of pipelines, but then exclude one. It should use
+ // one of the existing rather than createing a new one, as the limit is
+ // checked against all pipelines, not just the filtered list
+ ExcludeList exclude = new ExcludeList();
+ PipelineID excludedID = allocatedContainers
+ .stream().findFirst().get().getPipelineID();
+ exclude.addPipeline(excludedID);
+
+ ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+ assertNotEquals(excludedID, c.getPipelineID());
+ assertTrue(allocatedContainers.contains(c));
+ }
+
+ @Test
+ public void testNewPipelineCreatedIfAllPipelinesExcluded()
+ throws IOException {
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i = 0; i < minPipelines; i++) {
+ ContainerInfo container = provider.getContainer(
+ 1, repConfig, OWNER, new ExcludeList());
+ allocatedContainers.add(container);
+ }
+ // We have the min limit of pipelines, but then exclude one. It should use
+ // one of the existing rather than creating a new one, as the limit is
+ // checked against all pipelines, not just the filtered list
+ ExcludeList exclude = new ExcludeList();
+ for (ContainerInfo c : allocatedContainers) {
+ exclude.addPipeline(c.getPipelineID());
+ }
+ ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+ assertFalse(allocatedContainers.contains(c));
+ }
+
+ @Test
+ public void testNewPipelineCreatedIfAllContainersExcluded()
+ throws IOException {
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i = 0; i < minPipelines; i++) {
+ ContainerInfo container = provider.getContainer(
+ 1, repConfig, OWNER, new ExcludeList());
+ allocatedContainers.add(container);
+ }
+ // We have the min limit of pipelines, but then exclude one. It should use
+ // one of the existing rather than createing a new one, as the limit is
+ // checked against all pipelines, not just the filtered list
+ ExcludeList exclude = new ExcludeList();
+ for (ContainerInfo c : allocatedContainers) {
+ exclude.addConatinerId(c.containerID());
+ }
+ ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+ assertFalse(allocatedContainers.contains(c));
+ }
+
+ @Test
+ public void testUnableToCreateAnyPipelinesReturnsNull() throws IOException {
- pipelineManager =
- new MockPipelineManager(dbStore, scmhaManager, nodeManager) {
++ pipelineManager = new MockPipelineManager(
++ dbStore, scmhaManager, nodeManager) {
+ @Override
+ public Pipeline createPipeline(ReplicationConfig repConf,
+ List<DatanodeDetails> excludedNodes,
+ List<DatanodeDetails> favoredNodes) throws IOException {
+ throw new IOException("Cannot create pipelines");
+ }
+ };
+ provider = new WritableECContainerProvider(
+ conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertNull(container);
+ }
+
+ @Test
+ public void testExistingPipelineReturnedWhenNewCannotBeCreated()
+ throws IOException {
- pipelineManager =
- new MockPipelineManager(dbStore, scmhaManager, nodeManager) {
++ pipelineManager = new MockPipelineManager(
++ dbStore, scmhaManager, nodeManager) {
+
+ private boolean throwError = false;
+
+ @Override
+ public Pipeline createPipeline(ReplicationConfig repConf,
+ List<DatanodeDetails> excludedNodes,
+ List<DatanodeDetails> favoredNodes) throws IOException {
+ if (throwError) {
+ throw new IOException("Cannot create pipelines");
+ }
+ throwError = true;
+ return super.createPipeline(repConfig);
+ }
+ };
+ provider = new WritableECContainerProvider(
+ conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ for (int i = 0; i < 5; i++) {
+ ContainerInfo nextContainer =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertEquals(container, nextContainer);
+ }
+ }
+
+ @Test
+ public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting()
+ throws IOException {
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i = 0; i < minPipelines; i++) {
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertFalse(allocatedContainers.contains(container));
+ allocatedContainers.add(container);
+ }
+ // Update all the containers to make them nearly full, but with enough space
+ // for an EC block to be striped across them.
+ for (ContainerInfo c : allocatedContainers) {
+ c.setUsedBytes(getMaxContainerSize() - 30 * 1024 * 1024);
+ }
+
+ // Get a new container of size 50 and ensure it is one of the original set.
+ // We ask for a space of 50, but as it is stripped across the EC group it
+ // will actually need 50 / dataNum space
+ ContainerInfo newContainer =
+ provider.getContainer(50 * 1024 * 1024, repConfig, OWNER,
+ new ExcludeList());
+ assertNotNull(newContainer);
+ assertTrue(allocatedContainers.contains(newContainer));
+ // Now get a new container where there is not enough space in the existing
+ // and ensure a new container gets created.
+ newContainer = provider.getContainer(
+ 128 * 1024 * 1024, repConfig, OWNER, new ExcludeList());
+ assertNotNull(newContainer);
+ assertFalse(allocatedContainers.contains(newContainer));
+ // The original pipelines should all be closed, triggered by the lack of
+ // space.
+ for (ContainerInfo c : allocatedContainers) {
+ Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
+ assertEquals(CLOSED, pipeline.getPipelineState());
+ }
+ }
+
+ @Test
+ public void testPipelineNotFoundWhenAttemptingToUseExisting()
+ throws IOException {
+ // Ensure PM throws PNF exception when we ask for the containers in the
+ // pipeline
- pipelineManager =
- new MockPipelineManager(dbStore, scmhaManager, nodeManager) {
++ pipelineManager = new MockPipelineManager(
++ dbStore, scmhaManager, nodeManager) {
+
+ @Override
+ public NavigableSet<ContainerID> getContainersInPipeline(
+ PipelineID pipelineID) throws IOException {
+ throw new PipelineNotFoundException("Simulated exception");
+ }
+ };
+ provider = new WritableECContainerProvider(
+ conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i = 0; i < minPipelines; i++) {
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertFalse(allocatedContainers.contains(container));
+ allocatedContainers.add(container);
+ }
+ // Now attempt to get a container - any attempt to use an existing with
+ // throw PNF and then we must allocate a new one
+ ContainerInfo newContainer =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertNotNull(newContainer);
+ assertFalse(allocatedContainers.contains(newContainer));
+ }
+
+ @Test
+ public void testContainerNotFoundWhenAttemptingToUseExisting()
+ throws IOException {
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i = 0; i < minPipelines; i++) {
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertFalse(allocatedContainers.contains(container));
+ allocatedContainers.add(container);
+ }
+
+ // Ensure ContainerManager always throws when a container is requested so
+ // existing pipelines cannot be used
+ Mockito.doAnswer(call -> {
+ throw new ContainerNotFoundException();
+ }).when(containerManager).getContainer(Matchers.any(ContainerID.class));
+
+ ContainerInfo newContainer =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertNotNull(newContainer);
+ assertFalse(allocatedContainers.contains(newContainer));
+
+ // Ensure all the existing pipelines are closed
+ for (ContainerInfo c : allocatedContainers) {
+ Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
+ assertEquals(CLOSED, pipeline.getPipelineState());
+ }
+ }
+
+ @Test
+ public void testPipelineOpenButContainerRemovedFromIt() throws IOException {
+ // This can happen if the container close process is triggered from the DN.
+ // When tha happens, CM will change the container state to CLOSING and
+ // remove it from the container list in pipeline Manager.
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i = 0; i < minPipelines; i++) {
+ ContainerInfo container = provider.getContainer(
+ 1, repConfig, OWNER, new ExcludeList());
+ assertFalse(allocatedContainers.contains(container));
+ allocatedContainers.add(container);
+ // Remove the container from the pipeline to simulate closing it
+ pipelineManager.removeContainerFromPipeline(
+ container.getPipelineID(), container.containerID());
+ }
+ ContainerInfo newContainer = provider.getContainer(
+ 1, repConfig, OWNER, new ExcludeList());
+ assertFalse(allocatedContainers.contains(newContainer));
+ for (ContainerInfo c : allocatedContainers) {
+ Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
+ assertEquals(CLOSED, pipeline.getPipelineState());
+ }
+ }
+
+ @Test
+ public void testExcludedNodesPassedToCreatePipelineIfProvided()
+ throws IOException {
+ PipelineManager pipelineManagerSpy = Mockito.spy(pipelineManager);
+ provider = new WritableECContainerProvider(
+ conf, pipelineManagerSpy, containerManager, pipelineChoosingPolicy);
+ ExcludeList excludeList = new ExcludeList();
+
+ // EmptyList should be passed if there are no nodes excluded.
+ ContainerInfo container = provider.getContainer(
+ 1, repConfig, OWNER, excludeList);
+ assertNotNull(container);
+
+ verify(pipelineManagerSpy).createPipeline(repConfig,
+ Collections.emptyList(), Collections.emptyList());
+
+ // If nodes are excluded then the excluded nodes should be passed through to
+ // the create pipeline call.
+ excludeList.addDatanode(MockDatanodeDetails.randomDatanodeDetails());
+ List<DatanodeDetails> excludedNodes =
+ new ArrayList<>(excludeList.getDatanodes());
+
+ container = provider.getContainer(
+ 1, repConfig, OWNER, excludeList);
+ assertNotNull(container);
+ verify(pipelineManagerSpy).createPipeline(repConfig, excludedNodes,
+ Collections.emptyList());
+ }
+
+ private ContainerInfo createContainer(Pipeline pipeline,
+ ReplicationConfig repConf, long containerID) {
+ return new ContainerInfo.Builder()
+ .setContainerID(containerID)
+ .setOwner(OWNER)
+ .setReplicationConfig(repConf)
+ .setState(HddsProtos.LifeCycleState.OPEN)
+ .setPipelineID(pipeline.getId())
+ .setNumberOfKeys(0)
+ .setUsedBytes(0)
+ .setSequenceId(0)
+ .setDeleteTransactionId(0)
+ .build();
+ }
+
+ private long getMaxContainerSize() {
+ return (long)conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);
+ }
+
+}
diff --cc hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
index 2ea34b9,a7960d8..10b5758
--- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/container/TestInfoSubCommand.java
@@@ -246,26 -207,6 +246,26 @@@ public class TestInfoSubCommand
return new ContainerWithPipeline(container, pipeline);
}
+ private ContainerWithPipeline getECContainerWithPipeline() {
+ Pipeline pipeline = new Pipeline.Builder()
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setReplicationConfig(new ECReplicationConfig(3, 2))
+ .setId(PipelineID.randomId())
+ .setNodes(datanodes)
+ .build();
+
+ ContainerInfo container = new ContainerInfo.Builder()
+ .setSequenceId(1)
+ .setPipelineID(pipeline.getId())
+ .setUsedBytes(1234)
- .setReplicationConfig(new RatisReplicationConfig(THREE))
++ .setReplicationConfig(RatisReplicationConfig.getInstance(THREE))
+ .setNumberOfKeys(1)
+ .setState(CLOSED)
+ .build();
+
+ return new ContainerWithPipeline(container, pipeline);
+ }
+
private List<DatanodeDetails> createDatanodeDetails(int count) {
List<DatanodeDetails> dns = new ArrayList<>();
for (int i = 0; i < count; i++) {
diff --cc hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java
index 9d70f3a,0000000..acc3bda
mode 100644,000000..100644
--- a/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java
+++ b/hadoop-hdds/tools/src/test/java/org/apache/hadoop/hdds/scm/cli/pipeline/TestListPipelinesSubCommand.java
@@@ -1,192 -1,0 +1,192 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.cli.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import picocli.CommandLine;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the ListPipelineSubCommand class.
+ */
+public class TestListPipelinesSubCommand {
+
+ private ListPipelinesSubcommand cmd;
+ private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
+ private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
+ private final PrintStream originalOut = System.out;
+ private final PrintStream originalErr = System.err;
+ private static final String DEFAULT_ENCODING = StandardCharsets.UTF_8.name();
+ private ScmClient scmClient;
+
+ @Before
+ public void setup() throws IOException {
+ cmd = new ListPipelinesSubcommand();
+ System.setOut(new PrintStream(outContent, false, DEFAULT_ENCODING));
+ System.setErr(new PrintStream(errContent, false, DEFAULT_ENCODING));
+
+ scmClient = mock(ScmClient.class);
+ Mockito.when(scmClient.listPipelines())
+ .thenAnswer(invocation -> createPipelines());
+ }
+
+ @After
+ public void tearDown() {
+ System.setOut(originalOut);
+ System.setErr(originalErr);
+ }
+
+ @Test
+ public void testAllPipelinesReturnedWithNoFilter() throws IOException {
+ CommandLine c = new CommandLine(cmd);
+ c.parseArgs();
+ cmd.execute(scmClient);
+ Assert.assertEquals(6, outContent.toString(DEFAULT_ENCODING)
+ .split(System.getProperty("line.separator")).length);
+ }
+
+ @Test
+ public void testOnlyOpenReturned() throws IOException {
+ CommandLine c = new CommandLine(cmd);
+ c.parseArgs("-s", "OPEN");
+ cmd.execute(scmClient);
+ String output = outContent.toString(DEFAULT_ENCODING);
+ Assert.assertEquals(3, output.split(
+ System.getProperty("line.separator")).length);
+ Assert.assertEquals(-1, output.indexOf("CLOSED"));
+ }
+
+ @Test(expected = IOException.class)
+ public void testExceptionIfReplicationWithoutType() throws IOException {
+ CommandLine c = new CommandLine(cmd);
+ c.parseArgs("-r", "THREE");
+ cmd.execute(scmClient);
+ }
+
+ @Test
+ public void testReplicationAndType() throws IOException {
+ CommandLine c = new CommandLine(cmd);
+ c.parseArgs("-r", "THREE", "-t", "RATIS");
+ cmd.execute(scmClient);
+
+ String output = outContent.toString(DEFAULT_ENCODING);
+ Assert.assertEquals(2, output.split(
+ System.getProperty("line.separator")).length);
+ Assert.assertEquals(-1, output.indexOf("EC"));
+ }
+
+ @Test
+ public void testReplicationAndTypeEC() throws IOException {
+ CommandLine c = new CommandLine(cmd);
+ c.parseArgs("-r", "rs-6-3-1024k", "-t", "EC");
+ cmd.execute(scmClient);
+
+ String output = outContent.toString(DEFAULT_ENCODING);
+ Assert.assertEquals(1, output.split(
+ System.getProperty("line.separator")).length);
+ Assert.assertEquals(-1,
+ output.indexOf("ReplicationConfig: RATIS"));
+ }
+
+ @Test
+ public void testReplicationAndTypeAndState() throws IOException {
+ CommandLine c = new CommandLine(cmd);
+ c.parseArgs("-r", "THREE", "-t", "RATIS", "-s", "OPEN");
+ cmd.execute(scmClient);
+
+ String output = outContent.toString(DEFAULT_ENCODING);
+ Assert.assertEquals(1, output.split(
+ System.getProperty("line.separator")).length);
+ Assert.assertEquals(-1, output.indexOf("CLOSED"));
+ Assert.assertEquals(-1, output.indexOf("EC"));
+ }
+
+ private List<Pipeline> createPipelines() {
+ List<Pipeline> pipelines = new ArrayList<>();
- pipelines.add(createPipeline(
- new StandaloneReplicationConfig(ONE), Pipeline.PipelineState.OPEN));
- pipelines.add(createPipeline(
- new RatisReplicationConfig(THREE), Pipeline.PipelineState.OPEN));
- pipelines.add(createPipeline(
- new RatisReplicationConfig(THREE), Pipeline.PipelineState.CLOSED));
++ pipelines.add(createPipeline(StandaloneReplicationConfig.getInstance(ONE),
++ Pipeline.PipelineState.OPEN));
++ pipelines.add(createPipeline(RatisReplicationConfig.getInstance(THREE),
++ Pipeline.PipelineState.OPEN));
++ pipelines.add(createPipeline(RatisReplicationConfig.getInstance(THREE),
++ Pipeline.PipelineState.CLOSED));
+
+ pipelines.add(createPipeline(
+ new ECReplicationConfig(3, 2), Pipeline.PipelineState.OPEN));
+ pipelines.add(createPipeline(
+ new ECReplicationConfig(3, 2), Pipeline.PipelineState.CLOSED));
+ pipelines.add(createPipeline(
+ new ECReplicationConfig(6, 3), Pipeline.PipelineState.CLOSED));
+
+ return pipelines;
+ }
+
+ private Pipeline createPipeline(ReplicationConfig repConfig,
+ Pipeline.PipelineState state) {
+ return new Pipeline.Builder()
+ .setId(PipelineID.randomId())
+ .setCreateTimestamp(System.currentTimeMillis())
+ .setState(state)
+ .setReplicationConfig(repConfig)
+ .setNodes(createDatanodeDetails(1))
+ .build();
+ }
+
+ private List<DatanodeDetails> createDatanodeDetails(int count) {
+ List<DatanodeDetails> dns = new ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ HddsProtos.DatanodeDetailsProto dnd =
+ HddsProtos.DatanodeDetailsProto.newBuilder()
+ .setHostName("host" + i)
+ .setIpAddress("1.2.3." + i + 1)
+ .setNetworkLocation("/default")
+ .setNetworkName("host" + i)
+ .addPorts(HddsProtos.Port.newBuilder()
+ .setName("ratis").setValue(5678).build())
+ .setUuid(UUID.randomUUID().toString())
+ .build();
+ dns.add(DatanodeDetails.getFromProtoBuf(dnd));
+ }
+ return dns;
+ }
+}
+
+
diff --cc hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
index 405182f,0000000..b0e9755
mode 100644,000000..100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/ECBlockInputStream.java
@@@ -1,419 -1,0 +1,420 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.io;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.XceiverClientFactory;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.ByteReaderStrategy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.function.Function;
+
+/**
+ * Class to read data from an EC Block Group.
+ */
+public class ECBlockInputStream extends BlockExtendedInputStream {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(ECBlockInputStream.class);
+
+ private final ECReplicationConfig repConfig;
+ private final int ecChunkSize;
+ private final long stripeSize;
+ private final BlockInputStreamFactory streamFactory;
+ private final boolean verifyChecksum;
+ private final XceiverClientFactory xceiverClientFactory;
+ private final Function<BlockID, Pipeline> refreshFunction;
+ private final OmKeyLocationInfo blockInfo;
+ private final DatanodeDetails[] dataLocations;
+ private final BlockExtendedInputStream[] blockStreams;
+ private final int maxLocations;
+
+ private long position = 0;
+ private boolean closed = false;
+ private boolean seeked = false;
+
+ protected OmKeyLocationInfo getBlockInfo() {
+ return blockInfo;
+ }
+
+ protected ECReplicationConfig getRepConfig() {
+ return repConfig;
+ }
+
+ protected DatanodeDetails[] getDataLocations() {
+ return dataLocations;
+ }
+
+ protected long getStripeSize() {
+ return stripeSize;
+ }
+
+ /**
+ * Returns the number of available data locations, taking account of the
+ * expected number of locations. Eg, if the block is less than 1 EC chunk,
+ * we only expect 1 data location. If it is between 1 and 2 chunks, we expect
+ * there to be 2 locations, and so on.
+ * @param expectedLocations The maximum number of allowed data locations,
+ * depending on the block size.
+ * @return The number of available data locations.
+ */
+ protected int availableDataLocations(int expectedLocations) {
+ int count = 0;
+ for (int i = 0; i < repConfig.getData() && i < expectedLocations; i++) {
+ if (dataLocations[i] != null) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ protected int availableParityLocations() {
+ int count = 0;
+ for (int i = repConfig.getData();
+ i < repConfig.getData() + repConfig.getParity(); i++) {
+ if (dataLocations[i] != null) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ public ECBlockInputStream(ECReplicationConfig repConfig,
+ OmKeyLocationInfo blockInfo, boolean verifyChecksum,
+ XceiverClientFactory xceiverClientFactory, Function<BlockID,
+ Pipeline> refreshFunction, BlockInputStreamFactory streamFactory) {
+ this.repConfig = repConfig;
+ this.ecChunkSize = repConfig.getEcChunkSize();
+ this.verifyChecksum = verifyChecksum;
+ this.blockInfo = blockInfo;
+ this.streamFactory = streamFactory;
+ this.xceiverClientFactory = xceiverClientFactory;
+ this.refreshFunction = refreshFunction;
+ this.maxLocations = repConfig.getData() + repConfig.getParity();
+ this.dataLocations = new DatanodeDetails[repConfig.getRequiredNodes()];
+ this.blockStreams =
+ new BlockExtendedInputStream[repConfig.getRequiredNodes()];
+
+ this.stripeSize = (long)ecChunkSize * repConfig.getData();
+ setBlockLocations(this.blockInfo.getPipeline());
+ }
+
+ public synchronized boolean hasSufficientLocations() {
+ // The number of locations needed is a function of the EC Chunk size. If the
+ // block length is <= the chunk size, we should only have location 1. If it
+ // is greater than the chunk size but less than chunk_size * 2, then we must
+ // have two locations. If it is greater than chunk_size * data_num, then we
+ // must have all data_num locations.
+ // We only consider data locations here.
+ int expectedDataBlocks = calculateExpectedDataBlocks(repConfig);
+ return expectedDataBlocks == availableDataLocations(expectedDataBlocks);
+ }
+
+ protected int calculateExpectedDataBlocks(ECReplicationConfig rConfig) {
+ return ECBlockInputStreamProxy.expectedDataLocations(rConfig, getLength());
+ }
+
+ /**
+ * Using the current position, returns the index of the blockStream we should
+ * be reading from. This is the index in the internal array holding the
+ * stream reference. The block group index will be one greater than this.
+ * @return
+ */
+ protected int currentStreamIndex() {
+ return (int)((position / ecChunkSize) % repConfig.getData());
+ }
+
+ /**
+ * Uses the current position and ecChunkSize to determine which of the
+ * internal block streams the next read should come from. Also opens the
+ * stream if it has not been opened already.
+ * @return BlockInput stream to read from.
+ */
+ protected BlockExtendedInputStream getOrOpenStream(int locationIndex) {
+ BlockExtendedInputStream stream = blockStreams[locationIndex];
+ if (stream == null) {
+ // To read an EC block, we create a STANDALONE pipeline that contains the
+ // single location for the block index we want to read. The EC blocks are
+ // indexed from 1 to N, however the data locations are stored in the
+ // dataLocations array indexed from zero.
+ Pipeline pipeline = Pipeline.newBuilder()
- .setReplicationConfig(new StandaloneReplicationConfig(
++ .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+ HddsProtos.ReplicationFactor.ONE))
+ .setNodes(Arrays.asList(dataLocations[locationIndex]))
+ .setId(PipelineID.randomId())
+ .setState(Pipeline.PipelineState.CLOSED)
+ .build();
+
+ OmKeyLocationInfo blkInfo = new OmKeyLocationInfo.Builder()
+ .setBlockID(blockInfo.getBlockID())
+ .setLength(internalBlockLength(locationIndex + 1))
+ .setPipeline(blockInfo.getPipeline())
+ .setToken(blockInfo.getToken())
+ .setPartNumber(blockInfo.getPartNumber())
+ .build();
+ stream = streamFactory.create(
- new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE),
++ StandaloneReplicationConfig.getInstance(
++ HddsProtos.ReplicationFactor.ONE),
+ blkInfo, pipeline,
+ blockInfo.getToken(), verifyChecksum, xceiverClientFactory,
+ refreshFunction);
+ blockStreams[locationIndex] = stream;
+ }
+ return stream;
+ }
+
+ /**
+ * Returns the length of the Nth block in the block group, taking account of a
+ * potentially partial last stripe. Note that the internal block index is
+ * numbered starting from 1.
+ * @param index - Index number of the internal block, starting from 1
+ * @return
+ */
+ protected long internalBlockLength(int index) {
+ long lastStripe = blockInfo.getLength() % stripeSize;
+ long blockSize = (blockInfo.getLength() - lastStripe) / repConfig.getData();
+ long lastCell = lastStripe / ecChunkSize + 1;
+ long lastCellLength = lastStripe % ecChunkSize;
+
+ if (index > repConfig.getData()) {
+ // Its a parity block and their size is driven by the size of the
+ // first block of the block group. All parity blocks have the same size
+ // as block_1.
+ index = 1;
+ }
+
+ if (index < lastCell) {
+ return blockSize + ecChunkSize;
+ } else if (index == lastCell) {
+ return blockSize + lastCellLength;
+ } else {
+ return blockSize;
+ }
+ }
+
+ private void setBlockLocations(Pipeline pipeline) {
+ for (DatanodeDetails node : pipeline.getNodes()) {
+ int index = pipeline.getReplicaIndex(node);
+ addBlockLocation(index, node);
+ }
+ }
+
+ private void addBlockLocation(int index, DatanodeDetails location) {
+ if (index > maxLocations) {
+ throw new IndexOutOfBoundsException("The index " + index + " is greater "
+ + "than the EC Replication Config (" + repConfig + ")");
+ }
+ dataLocations[index - 1] = location;
+ }
+
+ protected long blockLength() {
+ return blockInfo.getLength();
+ }
+
+ protected long remaining() {
+ return blockLength() - position;
+ }
+
+ /**
+ * Read from the internal BlockInputStreams one EC cell at a time into the
+ * strategy buffer. This call may read from several internal BlockInputStreams
+ * if there is sufficient space in the buffer.
+ * @param strategy
+ * @return
+ * @throws IOException
+ */
+ @Override
+ protected synchronized int readWithStrategy(ByteReaderStrategy strategy)
+ throws IOException {
+ Preconditions.checkArgument(strategy != null);
+ checkOpen();
+
+ if (remaining() == 0) {
+ return EOF;
+ }
+
+ int totalRead = 0;
+ while (strategy.getTargetLength() > 0 && remaining() > 0) {
+ try {
+ int currentIndex = currentStreamIndex();
+ BlockExtendedInputStream stream = getOrOpenStream(currentIndex);
+ int read = readFromStream(stream, strategy);
+ totalRead += read;
+ position += read;
+ } catch (IOException ioe) {
+ throw new BadDataLocationException(
+ dataLocations[currentStreamIndex()], ioe);
+ }
+ }
+ return totalRead;
+ }
+
+ @Override
+ public synchronized long getRemaining() {
+ return blockInfo.getLength() - position;
+ }
+
+ @Override
+ public synchronized long getLength() {
+ return blockInfo.getLength();
+ }
+
+ @Override
+ public BlockID getBlockID() {
+ return blockInfo.getBlockID();
+ }
+
+ protected void seekStreamIfNecessary(BlockExtendedInputStream stream,
+ long partialChunkSize) throws IOException {
+ if (seeked) {
+ // Seek on the underlying streams is performed lazily, as there is a
+ // possibility a read after a seek may only read a small amount of data.
+ // Once this block stream has been seeked, we always check the position,
+ // but in the usual case, where there are no seeks at all, we don't need
+ // to do this extra work.
+ long basePosition = (position / stripeSize) * (long)ecChunkSize;
+ long streamPosition = basePosition + partialChunkSize;
+ if (streamPosition != stream.getPos()) {
+ // This ECBlockInputStream has been seeked, so the underlying
+ // block stream is no longer at the correct position. Therefore we need
+ // to seek it too.
+ stream.seek(streamPosition);
+ }
+ }
+ }
+
+ /**
+ * Read the most allowable amount of data from the current stream. This
+ * ensures we don't read past the end of an EC cell or the overall block
+ * group length.
+ * @param stream Stream to read from
+ * @param strategy The ReaderStrategy to read data into
+ * @return
+ * @throws IOException
+ */
+ private int readFromStream(BlockExtendedInputStream stream,
+ ByteReaderStrategy strategy)
+ throws IOException {
+ long partialPosition = position % ecChunkSize;
+ seekStreamIfNecessary(stream, partialPosition);
+ long ecLimit = ecChunkSize - partialPosition;
+ // Free space in the buffer to read into
+ long bufLimit = strategy.getTargetLength();
+ // How much we can read, the lower of the EC Cell, buffer and overall block
+ // remaining.
+ int expectedRead = (int)Math.min(Math.min(ecLimit, bufLimit), remaining());
+ int actualRead = strategy.readFromBlock(stream, expectedRead);
+ if (actualRead == -1) {
+ // The Block Stream reached EOF, but we did not expect it to, so the block
+ // might be corrupt.
+ throw new IOException("Expected to read " + expectedRead + " but got EOF"
+ + " from blockGroup " + stream.getBlockID() + " index "
+ + currentStreamIndex() + 1);
+ }
+ return actualRead;
+ }
+
+ /**
+ * Verify that the input stream is open.
+ * @throws IOException if the connection is closed.
+ */
+ private void checkOpen() throws IOException {
+ if (closed) {
+ throw new IOException(
+ ": " + FSExceptionMessages.STREAM_IS_CLOSED + " Block: "
+ + blockInfo.getBlockID());
+ }
+ }
+
+ @Override
+ public synchronized void close() {
+ closeStreams();
+ closed = true;
+ }
+
+ protected synchronized void closeStreams() {
+ for (int i = 0; i < blockStreams.length; i++) {
+ if (blockStreams[i] != null) {
+ try {
+ blockStreams[i].close();
+ blockStreams[i] = null;
+ } catch (IOException e) {
+ LOG.error("Failed to close stream {}", blockStreams[i], e);
+ }
+ }
+ }
+ // If the streams have been closed outside of a close() call, then it may
+ // be due to freeing resources. If they are reopened, then we will need to
+ // seek the stream to its expected position when the next read is attempted.
+ seeked = true;
+ }
+
+ @Override
+ public synchronized void unbuffer() {
+ for (BlockExtendedInputStream stream : blockStreams) {
+ if (stream != null) {
+ stream.unbuffer();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void seek(long pos) throws IOException {
+ checkOpen();
+ if (pos < 0 || pos >= getLength()) {
+ if (pos == 0) {
+ // It is possible for length and pos to be zero in which case
+ // seek should return instead of throwing exception
+ return;
+ }
+ throw new EOFException(
+ "EOF encountered at pos: " + pos + " for block: "
+ + blockInfo.getBlockID());
+ }
+ position = pos;
+ seeked = true;
+ }
+
+ @Override
+ public synchronized long getPos() {
+ return position;
+ }
+
+ protected synchronized void setPos(long pos) {
+ position = pos;
+ }
+
+ @Override
+ public synchronized boolean seekToNewSource(long l) throws IOException {
+ return false;
+ }
- }
++}
diff --cc hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
index a742ab9,650fc91..aa64fd2
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmBucketInfo.java
@@@ -115,52 -111,4 +115,56 @@@ public class TestOmBucketInfo
Assert.assertEquals((int) 1, cloneBucketInfo.getAcls().size());
}
+
+ @Test
+ public void getProtobufMessageEC() {
+ OmBucketInfo omBucketInfo =
+ OmBucketInfo.newBuilder().setBucketName("bucket").setVolumeName("vol1")
+ .setCreationTime(Time.now()).setIsVersionEnabled(false)
+ .setStorageType(StorageType.ARCHIVE).setAcls(Collections
+ .singletonList(new OzoneAcl(IAccessAuthorizer.ACLIdentityType.USER,
+ "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
+ OzoneAcl.AclScope.ACCESS))).build();
+ OzoneManagerProtocolProtos.BucketInfo protobuf = omBucketInfo.getProtobuf();
+ // No EC Config
+ Assert.assertFalse(protobuf.hasDefaultReplicationConfig());
+
+ // Reconstruct object from Proto
+ OmBucketInfo recovered = OmBucketInfo.getFromProtobuf(protobuf);
+ Assert.assertNull(recovered.getDefaultReplicationConfig());
+
+ // EC Config
- omBucketInfo =
- OmBucketInfo.newBuilder().setBucketName("bucket").setVolumeName("vol1")
- .setCreationTime(Time.now()).setIsVersionEnabled(false)
- .setStorageType(StorageType.ARCHIVE).setAcls(Collections
- .singletonList(new OzoneAcl(IAccessAuthorizer.ACLIdentityType.USER,
- "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
- OzoneAcl.AclScope.ACCESS))).setDefaultReplicationConfig(
++ omBucketInfo = OmBucketInfo.newBuilder()
++ .setBucketName("bucket")
++ .setVolumeName("vol1")
++ .setCreationTime(Time.now())
++ .setIsVersionEnabled(false)
++ .setStorageType(StorageType.ARCHIVE)
++ .setAcls(Collections.singletonList(new OzoneAcl(
++ IAccessAuthorizer.ACLIdentityType.USER,
++ "defaultUser", IAccessAuthorizer.ACLType.WRITE_ACL,
++ OzoneAcl.AclScope.ACCESS)))
++ .setDefaultReplicationConfig(
+ new DefaultReplicationConfig(ReplicationType.EC,
+ new ECReplicationConfig(3, 2))).build();
+ protobuf = omBucketInfo.getProtobuf();
+
+ Assert.assertTrue(protobuf.hasDefaultReplicationConfig());
+ Assert.assertEquals(3,
+ protobuf.getDefaultReplicationConfig().getEcReplicationConfig()
+ .getData());
+ Assert.assertEquals(2,
+ protobuf.getDefaultReplicationConfig().getEcReplicationConfig()
+ .getParity());
+
+ // Reconstruct object from Proto
+ recovered = OmBucketInfo.getFromProtobuf(protobuf);
+ Assert.assertEquals(ReplicationType.EC,
+ recovered.getDefaultReplicationConfig().getType());
+ ECReplicationConfig config =
+ recovered.getDefaultReplicationConfig().getEcReplicationConfig();
+ Assert.assertNotNull(config);
+ Assert.assertEquals(3, config.getData());
+ Assert.assertEquals(2, config.getParity());
+ }
}
diff --cc hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
index 4440ce0,3b04ed4..a6a0ec9
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmKeyInfo.java
@@@ -25,9 -23,9 +25,10 @@@ import org.apache.hadoop.hdds.client.St
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+ import org.apache.hadoop.ozone.ClientVersion;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo.Builder;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.hadoop.util.Time;
import org.junit.Assert;
@@@ -38,10 -36,6 +39,9 @@@ import java.util.Arrays
import java.util.Collections;
import java.util.List;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.RATIS;
- import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
/**
@@@ -51,57 -45,7 +51,57 @@@ public class TestOmKeyInfo
@Test
public void protobufConversion() {
- OmKeyInfo key =
- createOmKeyInfo(new RatisReplicationConfig(ReplicationFactor.THREE));
- OmKeyInfo key = new Builder()
++ OmKeyInfo key = createOmKeyInfo(
++ RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
+
- OmKeyInfo keyAfterSerialization =
- OmKeyInfo.getFromProtobuf(key.getProtobuf(CURRENT_VERSION));
++ OmKeyInfo keyAfterSerialization = OmKeyInfo.getFromProtobuf(
++ key.getProtobuf(ClientVersion.CURRENT_VERSION));
+
+ Assert.assertEquals(key, keyAfterSerialization);
+ }
+
+ @Test
+ public void getProtobufMessageEC() {
- OmKeyInfo key =
- createOmKeyInfo(new RatisReplicationConfig(ReplicationFactor.THREE));
++ OmKeyInfo key = createOmKeyInfo(
++ RatisReplicationConfig.getInstance(ReplicationFactor.THREE));
+ OzoneManagerProtocolProtos.KeyInfo omKeyProto =
- key.getProtobuf(CURRENT_VERSION);
++ key.getProtobuf(ClientVersion.CURRENT_VERSION);
+
+ // No EC Config
+ Assert.assertFalse(omKeyProto.hasEcReplicationConfig());
+ Assert.assertEquals(THREE, omKeyProto.getFactor());
+ Assert.assertEquals(RATIS, omKeyProto.getType());
+
+ // Reconstruct object from Proto
+ OmKeyInfo recovered = OmKeyInfo.getFromProtobuf(omKeyProto);
+ Assert.assertEquals(RATIS,
+ recovered.getReplicationConfig().getReplicationType());
+ Assert.assertTrue(
+ recovered.getReplicationConfig() instanceof RatisReplicationConfig);
+
+ // EC Config
+ key = createOmKeyInfo(new ECReplicationConfig(3, 2));
- omKeyProto = key.getProtobuf(CURRENT_VERSION);
++ omKeyProto = key.getProtobuf(ClientVersion.CURRENT_VERSION);
+
+ Assert.assertEquals(3, omKeyProto.getEcReplicationConfig().getData());
+ Assert.assertEquals(2, omKeyProto.getEcReplicationConfig().getParity());
+ Assert.assertFalse(omKeyProto.hasFactor());
+ Assert.assertEquals(EC, omKeyProto.getType());
+
+ // Reconstruct object from Proto
+ recovered = OmKeyInfo.getFromProtobuf(omKeyProto);
+ Assert.assertEquals(EC,
+ recovered.getReplicationConfig().getReplicationType());
+ Assert.assertTrue(
+ recovered.getReplicationConfig() instanceof ECReplicationConfig);
+ ECReplicationConfig config =
+ (ECReplicationConfig) recovered.getReplicationConfig();
+ Assert.assertEquals(3, config.getData());
+ Assert.assertEquals(2, config.getParity());
+ }
+
+ private OmKeyInfo createOmKeyInfo(ReplicationConfig replicationConfig) {
+ return new Builder()
.setKeyName("key1")
.setBucketName("bucket")
.setVolumeName("vol1")
@@@ -211,4 -161,4 +211,4 @@@
.setPipeline(pipeline)
.build();
}
--}
++}
diff --cc hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java
index 59aeece,ee850bc..c6f8818
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmMultipartKeyInfo.java
@@@ -42,90 -34,22 +42,91 @@@ import static org.junit.Assert.assertNo
public class TestOmMultipartKeyInfo {
@Test
- public void testCopyObject() {
- OmMultipartKeyInfo omMultipartKeyInfo = new OmMultipartKeyInfo.Builder()
- .setUploadID(UUID.randomUUID().toString())
- .setCreationTime(Time.now())
- .setReplicationConfig(RatisReplicationConfig
- .getInstance(HddsProtos.ReplicationFactor.THREE))
+ public void copyObject() {
+ for (ReplicationConfig param : replicationConfigs().collect(toList())) {
+ testCopyObject(param);
+ }
+ }
+
+ //@ParameterizedTest
+ //@MethodSource("replicationConfigs")
+ private void testCopyObject(ReplicationConfig replicationConfig) {
+ // GIVEN
+ OmMultipartKeyInfo subject = createSubject()
+ .setReplicationConfig(replicationConfig)
+ .build();
+
+ // WHEN
+ OmMultipartKeyInfo copy = subject.copyObject();
+
+ // THEN
+ assertNotSame(subject, copy);
+ assertEquals(subject, copy);
+ assertEquals(replicationConfig, copy.getReplicationConfig());
+ }
+
+ @Test
+ public void protoConversion() {
+ for (ReplicationConfig param : replicationConfigs().collect(toList())) {
+ protoConversion(param);
+ }
+ }
+
+ //@ParameterizedTest
+ //@MethodSource("replicationConfigs")
+ private void protoConversion(ReplicationConfig replicationConfig) {
+ // GIVEN
+ OmMultipartKeyInfo subject = createSubject()
+ .setReplicationConfig(replicationConfig)
.build();
- OmMultipartKeyInfo cloneMultipartKeyInfo = omMultipartKeyInfo.copyObject();
+ // WHEN
+ OzoneManagerProtocolProtos.MultipartKeyInfo proto = subject.getProto();
+ OmMultipartKeyInfo fromProto = OmMultipartKeyInfo.getFromProto(proto);
+
+ // THEN
+ assertEquals(subject, fromProto);
+ assertEquals(replicationConfig, fromProto.getReplicationConfig());
+ }
+
+ private static Stream<ReplicationConfig> replicationConfigs() {
+ return Stream.of(
- new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE),
- new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE),
++ StandaloneReplicationConfig.getInstance(
++ HddsProtos.ReplicationFactor.ONE),
++ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
+ new ECReplicationConfig(3, 2)
+ );
+ }
+
+ @Test
+ public void distinctListOfParts() {
+ // GIVEN
+ OmMultipartKeyInfo subject = createSubject().build();
+ OmMultipartKeyInfo copy = subject.copyObject();
+
+ // WHEN
+ subject.addPartKeyInfo(1, createPart(createKeyInfo()).build());
+
+ // THEN
+ assertEquals(0, copy.getPartKeyInfoMap().size());
+ assertEquals(1, subject.getPartKeyInfoMap().size());
+ }
- Assert.assertEquals(cloneMultipartKeyInfo, omMultipartKeyInfo);
+ private static OmMultipartKeyInfo.Builder createSubject() {
+ return new OmMultipartKeyInfo.Builder()
+ .setUploadID(UUID.randomUUID().toString())
+ .setCreationTime(Time.now());
+ }
- // Just setting dummy values for this test.
- omMultipartKeyInfo.addPartKeyInfo(1,
- PartKeyInfo.newBuilder().setPartNumber(1).setPartName("/path")
- .setPartKeyInfo(KeyInfo.newBuilder()
+ private static PartKeyInfo.Builder createPart(KeyInfo.Builder partKeyInfo) {
+ return PartKeyInfo.newBuilder()
+ .setPartNumber(1)
+ .setPartName("/path")
+ .setPartKeyInfo(partKeyInfo);
+ }
+
+ private static KeyInfo.Builder createKeyInfo() {
+ return KeyInfo.newBuilder()
.setVolumeName(UUID.randomUUID().toString())
.setBucketName(UUID.randomUUID().toString())
.setKeyName(UUID.randomUUID().toString())
diff --cc hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestQuotaUtil.java
index d06932a,0000000..558676d
mode 100644,000000..100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestQuotaUtil.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestQuotaUtil.java
@@@ -1,98 -1,0 +1,99 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.helpers;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
++import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.hadoop.hdds.client.ECReplicationConfig.EcCodec.RS;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+
+/**
+ * Tests for the QuotaUtil class.
+ */
+public class TestQuotaUtil {
+
+ private static final int ONE_MB = 1024 * 1024;
+
+ @Test
+ public void testRatisThreeReplication() {
- RatisReplicationConfig repConfig = new RatisReplicationConfig(THREE);
++ ReplicationConfig repConfig = RatisReplicationConfig.getInstance(THREE);
+ long replicatedSize =
+ QuotaUtil.getReplicatedSize(123 * ONE_MB, repConfig);
+ Assert.assertEquals(123 * ONE_MB * 3, replicatedSize);
+ }
+
+ @Test
+ public void testRatisOneReplication() {
- RatisReplicationConfig repConfig = new RatisReplicationConfig(ONE);
++ ReplicationConfig repConfig = RatisReplicationConfig.getInstance(ONE);
+ long replicatedSize =
+ QuotaUtil.getReplicatedSize(123 * ONE_MB, repConfig);
+ Assert.assertEquals(123 * ONE_MB, replicatedSize);
+ }
+
+ @Test
+ public void testECFullStripeReplication() {
+ ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
+ long dataSize = ONE_MB * 3 * 123; // 123 full stripe
+ long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
+ Assert.assertEquals(dataSize + 123 * ONE_MB * 2, replicatedSize);
+ }
+
+ @Test
+ public void testECPartialStripeIntoFirstChunk() {
+ ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
+ long dataSize = ONE_MB * 3 * 123 + 10; // 123 full stripes, plus 10 bytes
+ long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
+ // Expected is 123 parity stripes, plus another 10 bytes in each parity
+ Assert.assertEquals(dataSize + 123 * ONE_MB * 2 + 10 * 2, replicatedSize);
+ }
+
+ @Test
+ public void testECPartialStripeBeyondFirstChunk() {
+ ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
+ // 123 full stripes, plus 1MB+10 bytes
+ long dataSize = ONE_MB * 3 * 123 + ONE_MB + 10;
+ long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
+ // Expected is 123 parity stripes, plus another 1MB in each parity
+ Assert.assertEquals(
+ dataSize + 123 * ONE_MB * 2 + ONE_MB * 2, replicatedSize);
+ }
+
+ @Test
+ public void testECPartialSingleStripeFirstChunk() {
+ ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
+ long dataSize = 10;
+ long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
+ // Expected is 123 parity stripes, plus another 1MB in each parity
+ Assert.assertEquals(dataSize + 10 * 2, replicatedSize);
+ }
+
+ @Test
+ public void testECPartialSingleBeyondFirstChunk() {
+ ECReplicationConfig repConfig = new ECReplicationConfig(3, 2, RS, ONE_MB);
+ long dataSize = 2 * ONE_MB + 10;
+ long replicatedSize = QuotaUtil.getReplicatedSize(dataSize, repConfig);
+ // Expected is 123 parity stripes, plus another 1MB in each parity
+ Assert.assertEquals(dataSize + ONE_MB * 2, replicatedSize);
+ }
+
+}
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
index fea70ac,bf24ae9..4c4bdc6
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystem.java
@@@ -1196,66 -1188,6 +1196,66 @@@ public class TestOzoneFileSystem
}
@Test
+ public void testCreateKeyShouldUseRefreshedBucketReplicationConfig()
+ throws IOException {
+ OzoneBucket bucket =
+ TestDataUtil.createVolumeAndBucket(cluster, bucketLayout);
+ final TestClock testClock = new TestClock(Instant.now(), ZoneOffset.UTC);
+
+ String rootPath = String
+ .format("%s://%s.%s/", OzoneConsts.OZONE_URI_SCHEME, bucket.getName(),
+ bucket.getVolumeName());
+
+ // Set the fs.defaultFS and start the filesystem
+ Configuration conf = new OzoneConfiguration(cluster.getConf());
+ conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+ // Set the number of keys to be processed during batch operate.
+ OzoneFileSystem o3FS = (OzoneFileSystem) FileSystem.get(conf);
+
+ //Let's reset the clock to control the time.
+ ((BasicOzoneClientAdapterImpl) (o3FS.getAdapter())).setClock(testClock);
+
+ createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key"),
+ ReplicationType.RATIS);
+
+ bucket.setReplicationConfig(new ECReplicationConfig("rs-3-2-1024k"));
+
+ //After changing the bucket policy, it should create ec key, but o3fs will
+ // refresh after some time. So, it will be sill old type.
+ createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key1"),
+ ReplicationType.RATIS);
+
+ testClock.fastForward(300 * 1000 + 1);
+
+ //After client bucket refresh time, it should create new type what is
+ // available on bucket at that moment.
+ createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key2"),
+ ReplicationType.EC);
+
+ // Rechecking the same steps with changing to Ratis again to check the
+ // behavior is consistent.
+ bucket.setReplicationConfig(
- new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE));
++ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE));
+
+ createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key3"),
+ ReplicationType.EC);
+
+ testClock.fastForward(300 * 1000 + 1);
+
+ createKeyAndAssertKeyType(bucket, o3FS, new Path(rootPath, "key4"),
+ ReplicationType.RATIS);
+ }
+
+ private void createKeyAndAssertKeyType(OzoneBucket bucket,
+ OzoneFileSystem o3FS, Path keyPath, ReplicationType expectedType)
+ throws IOException {
+ o3FS.createFile(keyPath).build().close();
+ Assert.assertEquals(expectedType.name(),
+ bucket.getKey(o3FS.pathToKey(keyPath)).getReplicationConfig()
+ .getReplicationType().name());
+ }
+
+ @Test
public void testGetTrashRoots() throws IOException {
String username = UserGroupInformation.getCurrentUser().getShortUserName();
Path trashRoot = new Path(OZONE_URI_DELIMITER, TRASH_PREFIX);
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 13802a1,ef561e6..b239621
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@@ -68,15 -67,10 +68,15 @@@ public class TestOzoneConfigurationFiel
".duration"); // Deprecated config
configurationPropsToSkipCompare
.add(ScmConfig.ConfigStrings.HDDS_SCM_INIT_DEFAULT_LAYOUT_VERSION);
+ // Currently replication and type configs moved to server side.
+ configurationPropsToSkipCompare
+ .add(OzoneConfigKeys.OZONE_REPLICATION);
+ configurationPropsToSkipCompare
+ .add(OzoneConfigKeys.OZONE_REPLICATION_TYPE);
configurationPropsToSkipCompare
- .add(OzoneConfigKeys.OZONE_OM_CLIENT_PROTOCOL_VERSION_KEY);
+ .add(OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_KEY);
configurationPropsToSkipCompare
- .add(OzoneConfigKeys.OZONE_OM_CLIENT_PROTOCOL_VERSION);
+ .add(OzoneConfigKeys.OZONE_CLIENT_REQUIRED_OM_VERSION_MIN_DEFAULT);
// This property is tested in TestHttpServer2 instead
xmlPropsToSkipCompare.add(HttpServer2.HTTP_IDLE_TIMEOUT_MS_KEY);
addPropertiesNotInXml();
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
index 610a34c,0000000..4f7881f
mode 100644,000000..100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestECKeyOutputStream.java
@@@ -1,371 -1,0 +1,371 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.client.rpc;
+
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.OzoneClientConfig;
+import org.apache.hadoop.hdds.scm.cli.ContainerOperationClient;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.client.BucketArgs;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneClientFactory;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneKeyDetails;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.ECKeyOutputStream;
+import org.apache.hadoop.ozone.client.io.KeyOutputStream;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.container.TestHelper;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+
+/**
+ * Tests key output stream.
+ */
+public class TestECKeyOutputStream {
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf = new OzoneConfiguration();
+ private static OzoneClient client;
+ private static ObjectStore objectStore;
+ private static int chunkSize;
+ private static int flushSize;
+ private static int maxFlushSize;
+ private static int blockSize;
+ private static String volumeName;
+ private static String bucketName;
+ private static String keyString;
+ private static int dataBlocks = 3;
+ private static int inputSize = dataBlocks * chunkSize;
+ private static byte[][] inputChunks = new byte[dataBlocks][chunkSize];
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ chunkSize = 1024;
+ flushSize = 2 * chunkSize;
+ maxFlushSize = 2 * flushSize;
+ blockSize = 2 * maxFlushSize;
+
+ OzoneClientConfig clientConfig = conf.getObject(OzoneClientConfig.class);
+ clientConfig.setChecksumType(ContainerProtos.ChecksumType.NONE);
+ clientConfig.setStreamBufferFlushDelay(false);
+ conf.setFromObject(clientConfig);
+
+ conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
+ // If SCM detects dead node too quickly, then container would be moved to
+ // closed state and all in progress writes will get exception. To avoid
+ // that, we are just keeping higher timeout and none of the tests depending
+ // on deadnode detection timeout currently.
+ conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
+ conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 60, TimeUnit.SECONDS);
+ conf.setTimeDuration("hdds.ratis.raft.server.rpc.slowness.timeout", 300,
+ TimeUnit.SECONDS);
+ conf.setTimeDuration(
+ "hdds.ratis.raft.server.notification.no-leader.timeout", 300,
+ TimeUnit.SECONDS);
+ conf.setQuietMode(false);
+ conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
+ StorageUnit.MB);
+ conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 500,
+ TimeUnit.MILLISECONDS);
+ conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
+ TimeUnit.SECONDS);
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(10)
+ .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
+ .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
+ .setStreamBufferMaxSize(maxFlushSize)
+ .setStreamBufferSizeUnit(StorageUnit.BYTES).build();
+ cluster.waitForClusterToBeReady();
+ client = OzoneClientFactory.getRpcClient(conf);
+ objectStore = client.getObjectStore();
+ keyString = UUID.randomUUID().toString();
+ volumeName = "testeckeyoutputstream";
+ bucketName = volumeName;
+ objectStore.createVolume(volumeName);
+ objectStore.getVolume(volumeName).createBucket(bucketName);
+ initInputChunks();
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testCreateKeyWithECReplicationConfig() throws Exception {
+ try (OzoneOutputStream key = TestHelper
+ .createKey(keyString, new ECReplicationConfig(3, 2,
+ ECReplicationConfig.EcCodec.RS, chunkSize), inputSize,
+ objectStore, volumeName, bucketName)) {
+ Assert.assertTrue(key.getOutputStream() instanceof ECKeyOutputStream);
+ }
+ }
+
+ @Test
+ public void testCreateKeyWithOutBucketDefaults() throws Exception {
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ OzoneBucket bucket = volume.getBucket(bucketName);
+ try (OzoneOutputStream out = bucket.createKey("myKey", inputSize)) {
+ Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
+ for (int i = 0; i < inputChunks.length; i++) {
+ out.write(inputChunks[i]);
+ }
+ }
+ }
+
+ @Test
+ public void testCreateKeyWithBucketDefaults() throws Exception {
+ String myBucket = UUID.randomUUID().toString();
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+ bucketArgs.setDefaultReplicationConfig(
+ new DefaultReplicationConfig(ReplicationType.EC,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize)));
+
+ volume.createBucket(myBucket, bucketArgs.build());
+ OzoneBucket bucket = volume.getBucket(myBucket);
+
+ try (OzoneOutputStream out = bucket.createKey(keyString, inputSize)) {
+ Assert.assertTrue(out.getOutputStream() instanceof ECKeyOutputStream);
+ for (int i = 0; i < inputChunks.length; i++) {
+ out.write(inputChunks[i]);
+ }
+ }
+ byte[] buf = new byte[chunkSize];
+ try (OzoneInputStream in = bucket.readKey(keyString)) {
+ for (int i = 0; i < inputChunks.length; i++) {
+ int read = in.read(buf, 0, chunkSize);
+ Assert.assertEquals(chunkSize, read);
+ Assert.assertTrue(Arrays.equals(buf, inputChunks[i]));
+ }
+ }
+ }
+
+ @Test
+ public void testCreateRatisKeyAndWithECBucketDefaults() throws Exception {
+ OzoneBucket bucket = getOzoneBucket();
- try (OzoneOutputStream out = bucket
- .createKey("testCreateRatisKeyAndWithECBucketDefaults", 2000,
- new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE),
- new HashMap<>())) {
++ try (OzoneOutputStream out = bucket.createKey(
++ "testCreateRatisKeyAndWithECBucketDefaults", 2000,
++ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
++ new HashMap<>())) {
+ Assert.assertTrue(out.getOutputStream() instanceof KeyOutputStream);
+ for (int i = 0; i < inputChunks.length; i++) {
+ out.write(inputChunks[i]);
+ }
+ }
+ }
+
+ @Test
+ public void test13ChunksInSingleWriteOp() throws IOException {
+ testMultipleChunksInSingleWriteOp(13);
+ }
+
+ @Test
+ public void test15ChunksInSingleWriteOp() throws IOException {
+ testMultipleChunksInSingleWriteOp(15);
+ }
+
+ @Test
+ public void test20ChunksInSingleWriteOp() throws IOException {
+ testMultipleChunksInSingleWriteOp(20);
+ }
+
+ @Test
+ public void test21ChunksInSingleWriteOp() throws IOException {
+ testMultipleChunksInSingleWriteOp(21);
+ }
+
+ public void testMultipleChunksInSingleWriteOp(int numChunks)
+ throws IOException {
+ byte[] inputData = getInputBytes(numChunks);
+ final OzoneBucket bucket = getOzoneBucket();
+ String keyName = "testMultipleChunksInSingleWriteOp" + numChunks;
+ try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize), new HashMap<>())) {
+ out.write(inputData);
+ }
+
+ validateContent(inputData, bucket, bucket.getKey(keyName));
+ }
+
+ @Test
+ public void testECContainerKeysCount()
+ throws IOException, InterruptedException, TimeoutException {
+ byte[] inputData = getInputBytes(1);
+ final OzoneBucket bucket = getOzoneBucket();
+ ContainerOperationClient containerOperationClient =
+ new ContainerOperationClient(conf);
+
+ ECReplicationConfig repConfig = new ECReplicationConfig(
+ 3, 2, ECReplicationConfig.EcCodec.RS, chunkSize);
+ // Close all EC pipelines so we must get a fresh pipeline and hence
+ // container for this test.
+ PipelineManager pm =
+ cluster.getStorageContainerManager().getPipelineManager();
+ for (Pipeline p : pm.getPipelines(repConfig)) {
+ pm.closePipeline(p, true);
+ }
+
+ String keyName = UUID.randomUUID().toString();
+ try (OzoneOutputStream out = bucket.createKey(keyName, 4096,
+ repConfig, new HashMap<>())) {
+ out.write(inputData);
+ }
+ OzoneKeyDetails key = bucket.getKey(keyName);
+ long currentKeyContainerID =
+ key.getOzoneKeyLocations().get(0).getContainerID();
+
+ GenericTestUtils.waitFor(() -> {
+ try {
+ return containerOperationClient.getContainer(currentKeyContainerID)
+ .getNumberOfKeys() == 1;
+ } catch (IOException exception) {
+ Assert.fail("Unexpected exception " + exception);
+ return false;
+ }
+ }, 100, 10000);
+ validateContent(inputData, bucket, key);
+ }
+
+ private void validateContent(byte[] inputData, OzoneBucket bucket,
+ OzoneKey key) throws IOException {
+ try (OzoneInputStream is = bucket.readKey(key.getName())) {
+ byte[] fileContent = new byte[inputData.length];
+ Assert.assertEquals(inputData.length, is.read(fileContent));
+ Assert.assertEquals(new String(inputData, UTF_8),
+ new String(fileContent, UTF_8));
+ }
+ }
+
+ private OzoneBucket getOzoneBucket() throws IOException {
+ String myBucket = UUID.randomUUID().toString();
+ OzoneVolume volume = objectStore.getVolume(volumeName);
+ final BucketArgs.Builder bucketArgs = BucketArgs.newBuilder();
+ bucketArgs.setDefaultReplicationConfig(
+ new DefaultReplicationConfig(ReplicationType.EC,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize)));
+
+ volume.createBucket(myBucket, bucketArgs.build());
+ return volume.getBucket(myBucket);
+ }
+
+ private static void initInputChunks() {
+ for (int i = 0; i < dataBlocks; i++) {
+ inputChunks[i] = getBytesWith(i + 1, chunkSize);
+ }
+ }
+
+ private static byte[] getBytesWith(int singleDigitNumber, int total) {
+ StringBuilder builder = new StringBuilder(singleDigitNumber);
+ for (int i = 1; i <= total; i++) {
+ builder.append(singleDigitNumber);
+ }
+ return builder.toString().getBytes(UTF_8);
+ }
+
+ @Test
+ public void testWriteShouldSucceedWhenDNKilled() throws Exception {
+ int numChunks = 3;
+ byte[] inputData = getInputBytes(numChunks);
+ final OzoneBucket bucket = getOzoneBucket();
+ String keyName = "testWriteShouldSucceedWhenDNKilled" + numChunks;
+ DatanodeDetails nodeToKill = null;
+ try {
+ try (OzoneOutputStream out = bucket.createKey(keyName, 1024,
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS,
+ chunkSize), new HashMap<>())) {
+ out.write(inputData);
+ // Kill a node from first pipeline
+ nodeToKill =
+ ((ECKeyOutputStream) out.getOutputStream()).getStreamEntries()
+ .get(0).getPipeline().getFirstNode();
+ cluster.shutdownHddsDatanode(nodeToKill);
+
+ out.write(inputData);
+ // Check the second blockGroup pipeline to make sure that the failed not
+ // is not selected.
+ Assert.assertFalse(
+ ((ECKeyOutputStream) out.getOutputStream()).getStreamEntries()
+ .get(1).getPipeline().getNodes().contains(nodeToKill));
+ }
+
+ try (OzoneInputStream is = bucket.readKey(keyName)) {
+ // We wrote "inputData" twice, so do two reads and ensure the correct
+ // data comes back.
+ for (int i = 0; i < 2; i++) {
+ byte[] fileContent = new byte[inputData.length];
+ Assert.assertEquals(inputData.length, is.read(fileContent));
+ Assert.assertEquals(new String(inputData, UTF_8),
+ new String(fileContent, UTF_8));
+ }
+ }
+ } finally {
+ cluster.restartHddsDatanode(nodeToKill, true);
+ }
+ }
+
+ private byte[] getInputBytes(int numChunks) {
+ byte[] inputData = new byte[numChunks * chunkSize];
+ for (int i = 0; i < numChunks; i++) {
+ int start = (i * chunkSize);
+ Arrays.fill(inputData, start, start + chunkSize - 1,
+ String.valueOf(i % 9).getBytes(UTF_8)[0]);
+ }
+ return inputData;
+ }
+
+}
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 6e271fc,6272afa..37dec7c
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@@ -3899,47 -3798,4 +3922,49 @@@ public abstract class TestOzoneRpcClien
createRequiredForVersioningTest(volumeName, bucketName, keyName, true);
checkExceptedResultForVersioningTest(volumeName, bucketName, keyName, 2);
}
+
+ @Test
+ public void testSetECReplicationConfigOnBucket()
+ throws IOException {
+ String volumeName = UUID.randomUUID().toString();
+ store.createVolume(volumeName);
+ OzoneVolume volume = store.getVolume(volumeName);
+ OzoneBucket bucket = getBucket(volume);
+ ReplicationConfig currentReplicationConfig = bucket.getReplicationConfig();
+ Assert.assertEquals(
- new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE),
++ StandaloneReplicationConfig.getInstance(
++ HddsProtos.ReplicationFactor.ONE),
+ currentReplicationConfig);
+ ECReplicationConfig ecReplicationConfig =
+ new ECReplicationConfig(3, 2, ECReplicationConfig.EcCodec.RS, 1024);
+ bucket.setReplicationConfig(ecReplicationConfig);
+
+ // Get the bucket and check the updated config.
+ bucket = volume.getBucket(bucket.getName());
+
+ Assert.assertEquals(ecReplicationConfig, bucket.getReplicationConfig());
+
+ RatisReplicationConfig ratisReplicationConfig =
- new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+ bucket.setReplicationConfig(ratisReplicationConfig);
+
+ // Get the bucket and check the updated config.
+ bucket = volume.getBucket(bucket.getName());
+
+ Assert.assertEquals(ratisReplicationConfig, bucket.getReplicationConfig());
+
+ //Reset replication config back.
+ bucket.setReplicationConfig(currentReplicationConfig);
+ }
+
+ private OzoneBucket getBucket(OzoneVolume volume) throws IOException {
+ String bucketName = UUID.randomUUID().toString();
+ BucketArgs.Builder builder = BucketArgs.newBuilder();
+ builder.setVersioning(true).setDefaultReplicationConfig(
+ new DefaultReplicationConfig(
- new StandaloneReplicationConfig(HddsProtos.ReplicationFactor.ONE)));
++ StandaloneReplicationConfig.getInstance(
++ HddsProtos.ReplicationFactor.ONE)));
+ volume.createBucket(bucketName, builder.build());
+ return volume.getBucket(bucketName);
+ }
}
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
index dcc03bc,0000000..8532835
mode 100644,000000..100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestBlockInputStreamFactoryImpl.java
@@@ -1,111 -1,0 +1,111 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.client.rpc.read;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.storage.BlockExtendedInputStream;
+import org.apache.hadoop.hdds.scm.storage.BlockInputStream;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactory;
+import org.apache.hadoop.ozone.client.io.BlockInputStreamFactoryImpl;
+import org.apache.hadoop.ozone.client.io.ECBlockInputStreamProxy;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for BlockInputStreamFactoryImpl.
+ */
+public class TestBlockInputStreamFactoryImpl {
+
+ @Test
+ public void testNonECGivesBlockInputStream() {
+ BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
+ ReplicationConfig repConfig =
- new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+
+ OmKeyLocationInfo blockInfo = createKeyLocationInfo(repConfig, 3,
+ 1024 * 1024 * 10);
+
+ BlockExtendedInputStream stream =
+ factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
+ blockInfo.getToken(), true, null, null);
+ Assert.assertTrue(stream instanceof BlockInputStream);
+ Assert.assertEquals(stream.getBlockID(), blockInfo.getBlockID());
+ Assert.assertEquals(stream.getLength(), blockInfo.getLength());
+ }
+
+ @Test
+ public void testECGivesECBlockInputStream() {
+ BlockInputStreamFactory factory = new BlockInputStreamFactoryImpl();
+ ReplicationConfig repConfig =
+ new ECReplicationConfig(3, 2);
+
+ OmKeyLocationInfo blockInfo =
+ createKeyLocationInfo(repConfig, 5, 1024 * 1024 * 10);
+
+ BlockExtendedInputStream stream =
+ factory.create(repConfig, blockInfo, blockInfo.getPipeline(),
+ blockInfo.getToken(), true, null, null);
+ Assert.assertTrue(stream instanceof ECBlockInputStreamProxy);
+ Assert.assertEquals(stream.getBlockID(), blockInfo.getBlockID());
+ Assert.assertEquals(stream.getLength(), blockInfo.getLength());
+ }
+
+ private OmKeyLocationInfo createKeyLocationInfo(ReplicationConfig repConf,
+ long blockLength, Map<DatanodeDetails, Integer> dnMap) {
+
+ Pipeline pipeline = Pipeline.newBuilder()
+ .setState(Pipeline.PipelineState.CLOSED)
+ .setId(PipelineID.randomId())
+ .setNodes(new ArrayList<>(dnMap.keySet()))
+ .setReplicaIndexes(dnMap)
+ .setReplicationConfig(repConf)
+ .build();
+
+ OmKeyLocationInfo keyInfo = new OmKeyLocationInfo.Builder()
+ .setBlockID(new BlockID(1, 1))
+ .setLength(blockLength)
+ .setOffset(0)
+ .setPipeline(pipeline)
+ .setPartNumber(0)
+ .build();
+ return keyInfo;
+ }
+
+ private OmKeyLocationInfo createKeyLocationInfo(ReplicationConfig repConf,
+ int nodeCount, long blockLength) {
+ Map<DatanodeDetails, Integer> datanodes = new HashMap<>();
+ for (int i = 0; i < nodeCount; i++) {
+ datanodes.put(MockDatanodeDetails.randomDatanodeDetails(), i + 1);
+ }
+ return createKeyLocationInfo(repConf, blockLength, datanodes);
+ }
+
+}
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
index 7ab6f3f,7f0ab38..07c2433
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/read/TestInputStreamBase.java
@@@ -171,14 -169,8 +171,14 @@@ public abstract class TestInputStreamBa
}
byte[] writeKey(String keyName, int dataLength) throws Exception {
- ReplicationConfig repConfig = new RatisReplicationConfig(THREE);
++ ReplicationConfig repConfig = RatisReplicationConfig.getInstance(THREE);
+ return writeKey(keyName, repConfig, dataLength);
+ }
+
+ byte[] writeKey(String keyName, ReplicationConfig repConfig, int dataLength)
+ throws Exception {
OzoneOutputStream key = TestHelper.createKey(keyName,
- ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+ repConfig, 0, objectStore, volumeName, bucketName);
byte[] inputData = ContainerTestHelper.getFixedLengthString(
keyString, dataLength).getBytes(UTF_8);
@@@ -190,15 -182,8 +190,15 @@@
byte[] writeRandomBytes(String keyName, int dataLength)
throws Exception {
- ReplicationConfig repConfig = new RatisReplicationConfig(THREE);
++ ReplicationConfig repConfig = RatisReplicationConfig.getInstance(THREE);
+ return writeRandomBytes(keyName, repConfig, dataLength);
+ }
+
+ byte[] writeRandomBytes(String keyName, ReplicationConfig repConfig,
+ int dataLength)
+ throws Exception {
OzoneOutputStream key = TestHelper.createKey(keyName,
- ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+ repConfig, 0, objectStore, volumeName, bucketName);
byte[] inputData = new byte[dataLength];
RAND.nextBytes(inputData);
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 63e683f,7a4e1c3..dae6e38
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@@ -20,10 -20,14 +20,16 @@@ package org.apache.hadoop.ozone.contain
import java.io.IOException;
import java.security.MessageDigest;
- import java.util.*;
+ import java.util.ArrayList;
+ import java.util.Arrays;
+ import java.util.Collections;
+ import java.util.HashMap;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Set;
import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
index 689214f,cd6af31..4b51107
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
@@@ -191,10 -179,10 +192,10 @@@ public class TestRandomKeyGenerator
}
@Test
- @org.junit.Ignore("HDDS-5993")
- public void testCleanObjects() throws Exception {
+ @Flaky("HDDS-5993")
+ public void cleanObjectsTest() throws Exception {
RandomKeyGenerator randomKeyGenerator =
- new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
+ new RandomKeyGenerator(cluster.getConf());
randomKeyGenerator.setNumOfVolumes(2);
randomKeyGenerator.setNumOfBuckets(5);
randomKeyGenerator.setNumOfKeys(10);
diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
index f56b292,9b9d6d8..7ff190a
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmMetrics.java
@@@ -571,8 -569,6 +571,8 @@@ public class TestOmMetrics
.setBucketName(bucketName)
.setKeyName(keyName)
.setAcls(Lists.emptyList())
- .setReplicationConfig(
- new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE))
++ .setReplicationConfig(RatisReplicationConfig.getInstance(
++ HddsProtos.ReplicationFactor.THREE))
.build();
}
private OmVolumeArgs createVolumeArgs() {
diff --cc hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
index 54781c2,32dbe04..5e2ef2d
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyCommitRequestWithFSO.java
@@@ -175,11 -179,12 +179,11 @@@ public class OMKeyCommitRequestWithFSO
// AllocatedBlock. The space occupied by the Key shall be based on
// the actual Key size, and the total Block size applied before should
// be subtracted.
- long correctedSpace = omKeyInfo.getDataSize() * factor -
+ long correctedSpace = omKeyInfo.getReplicatedSize() -
- locationInfoList.size() * scmBlockSize * factor;
+ allocatedLocationInfoList.size() * scmBlockSize * factor;
// Subtract the size of blocks to be overwritten.
if (keyToDelete != null) {
- correctedSpace -= keyToDelete.getDataSize() *
- keyToDelete.getReplicationConfig().getRequiredNodes();
+ correctedSpace -= keyToDelete.getReplicatedSize();
}
omBucketInfo.incrUsedBytes(correctedSpace);
diff --cc hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
index cb69f27,1b327d2..48bcf46
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/key/OMKeyRequest.java
@@@ -32,14 -32,15 +32,17 @@@ import java.util.Map
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+ import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+ import org.apache.hadoop.ozone.OmUtils;
import org.apache.hadoop.ozone.OzoneAcl;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.om.PrefixManager;
import org.apache.hadoop.ozone.om.ResolvedBucket;
import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
++import org.apache.hadoop.ozone.om.helpers.BucketLayout;
import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
@@@ -48,8 -49,8 +51,8 @@@ import org.apache.hadoop.ozone.om.helpe
import org.apache.hadoop.ozone.om.helpers.OmPrefixInfo;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
--import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.QuotaUtil;
+ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
import org.apache.hadoop.ozone.om.request.file.OMFileRequest;
import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
diff --cc hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneConfigUtil.java
index 7dc07603,0000000..c888ee7
mode 100644,000000..100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneConfigUtil.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOzoneConfigUtil.java
@@@ -1,105 -1,0 +1,105 @@@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.client.DefaultReplicationConfig;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationFactor;
+import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests the server side replication config preference logic.
+ */
+public class TestOzoneConfigUtil {
+ private ReplicationConfig ratis3ReplicationConfig =
- new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+ private HddsProtos.ReplicationType noneType = HddsProtos.ReplicationType.NONE;
+ private HddsProtos.ReplicationFactor zeroFactor =
+ HddsProtos.ReplicationFactor.ZERO;
+ private HddsProtos.ECReplicationConfig clientECReplicationConfig =
+ new ECReplicationConfig("rs-3-2-1024K").toProto();
+ private DefaultReplicationConfig bucketECConfig =
+ new DefaultReplicationConfig(
+ new ECReplicationConfig(clientECReplicationConfig));
+
+ /**
+ * Tests EC bucket defaults.
+ */
+ @Test
+ public void testResolveClientSideRepConfigWhenBucketHasEC() {
+ ReplicationConfig replicationConfig = OzoneConfigUtil
+ .resolveReplicationConfigPreference(noneType, zeroFactor,
+ clientECReplicationConfig, bucketECConfig, ratis3ReplicationConfig);
+ // Client has no preference, so we should bucket defaults as we passed.
+ Assert.assertEquals(bucketECConfig.getEcReplicationConfig(),
+ replicationConfig);
+ }
+
+ /**
+ * Tests server defaults.
+ */
+ @Test
+ public void testResolveClientSideRepConfigWithNoClientAndBucketDefaults() {
+ ReplicationConfig replicationConfig = OzoneConfigUtil
+ .resolveReplicationConfigPreference(noneType, zeroFactor,
+ clientECReplicationConfig, null, ratis3ReplicationConfig);
+ // Client has no preference, no bucket defaults, so it should return server
+ // defaults.
+ Assert.assertEquals(ratis3ReplicationConfig, replicationConfig);
+ }
+
+ /**
+ * Tests client preference of EC.
+ */
+ @Test
+ public void testResolveClientSideRepConfigWhenClientPassEC() {
+ ReplicationConfig replicationConfig = OzoneConfigUtil
+ .resolveReplicationConfigPreference(HddsProtos.ReplicationType.EC,
+ zeroFactor, clientECReplicationConfig, null,
+ ratis3ReplicationConfig);
+ // Client has preference of type EC, no bucket defaults, so it should return
+ // client preference.
+ Assert.assertEquals(new ECReplicationConfig("rs-3-2-1024K"),
+ replicationConfig);
+ }
+
+ /**
+ * Tests bucket ratis defaults.
+ */
+ @Test
+ public void testResolveClientSideRepConfigWhenBucketHasEC3() {
+ DefaultReplicationConfig ratisBucketDefaults =
+ new DefaultReplicationConfig(ReplicationType.RATIS,
+ ReplicationFactor.THREE);
+ ReplicationConfig replicationConfig = OzoneConfigUtil
+ .resolveReplicationConfigPreference(noneType, zeroFactor,
+ clientECReplicationConfig, ratisBucketDefaults,
+ ratis3ReplicationConfig);
+ // Client has no preference of type and bucket has ratis defaults, so it
+ // should return ratis.
+ Assert.assertEquals(ratisBucketDefaults.getType().name(),
+ replicationConfig.getReplicationType().name());
+ Assert.assertEquals(ratisBucketDefaults.getFactor(),
+ ReplicationFactor.valueOf(replicationConfig.getRequiredNodes()));
+ }
+
+}
diff --cc hadoop-ozone/ozonefs-common/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneClientUtils.java
index f977577,a9d4f98..561cc34
--- a/hadoop-ozone/ozonefs-common/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneClientUtils.java
+++ b/hadoop-ozone/ozonefs-common/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneClientUtils.java
@@@ -37,16 -29,9 +37,16 @@@ import static org.junit.Assert.assertNu
import static org.mockito.Mockito.mock;
/**
- * Unit tests for OzoneClientUtils.
+ * Tests the behavior of OzoneClientUtils APIs.
*/
public class TestOzoneClientUtils {
+ private ReplicationConfig ecReplicationConfig =
+ new ECReplicationConfig("rs-3-2-1024K");
+ private ReplicationConfig ratis3ReplicationConfig =
- new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+ private ReplicationConfig ratis1ReplicationConfig =
- new RatisReplicationConfig(HddsProtos.ReplicationFactor.ONE);
++ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE);
+
@Test(expected = IllegalArgumentException.class)
public void testNegativeLength() throws IOException {
OzoneVolume volume = mock(OzoneVolume.class);
diff --cc hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 96bfe73,145578b..99a08d6
--- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@@ -174,27 -169,22 +174,28 @@@ public class ObjectEndpoint extends End
partNumber, uploadID, body);
}
+ String copyHeader = null, storageType = null;
try {
- String copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
- String storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
+ copyHeader = headers.getHeaderString(COPY_SOURCE_HEADER);
+ storageType = headers.getHeaderString(STORAGE_CLASS_HEADER);
- S3StorageType s3StorageType;
- boolean storageTypeDefault;
+ // Normal put object
+ OzoneBucket bucket = getBucket(bucketName);
+ ReplicationConfig clientConfiguredReplicationConfig = null;
+ String replication = ozoneConfiguration.get(OZONE_REPLICATION);
+ if (replication != null) {
+ clientConfiguredReplicationConfig = ReplicationConfig.parse(
+ ReplicationType.valueOf(ozoneConfiguration
+ .get(OZONE_REPLICATION_TYPE, OZONE_REPLICATION_TYPE_DEFAULT)),
+ replication, ozoneConfiguration);
+ }
+ ReplicationConfig replicationConfig = S3Utils
+ .resolveS3ClientSideReplicationConfig(storageType,
+ clientConfiguredReplicationConfig, bucket.getReplicationConfig());
+ boolean storageTypeDefault = false;
if (storageType == null || storageType.equals("")) {
- s3StorageType = S3StorageType.getDefault(ozoneConfiguration);
storageTypeDefault = true;
- } else {
- s3StorageType = toS3StorageType(storageType);
- storageTypeDefault = false;
}
- ReplicationType replicationType = s3StorageType.getType();
- ReplicationFactor replicationFactor = s3StorageType.getFactor();
if (copyHeader != null) {
//Copy object, as copy source available.
diff --cc hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
index 0f42b3d,ad12e65..fcddfde
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/OzoneBucketStub.java
@@@ -80,7 -79,8 +80,7 @@@ public class OzoneBucketStub extends Oz
long creationTime) {
super(volumeName,
bucketName,
- new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE),
- StandaloneReplicationConfig
- .getInstance(HddsProtos.ReplicationFactor.ONE),
++ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE),
storageType,
versioning,
creationTime);
diff --cc hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestS3Utils.java
index 55f2f38,0000000..6b8c9fc
mode 100644,000000..100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestS3Utils.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestS3Utils.java
@@@ -1,143 -1,0 +1,143 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.s3.util;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.s3.exception.OS3Exception;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests the S3Utils APIs.
+ */
+public class TestS3Utils {
+ private ReplicationConfig ecReplicationConfig =
+ new ECReplicationConfig("rs-3-2-1024K");
+ private ReplicationConfig ratis3ReplicationConfig =
- new RatisReplicationConfig(HddsProtos.ReplicationFactor.THREE);
++ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.THREE);
+ private ReplicationConfig ratis1ReplicationConfig =
- new RatisReplicationConfig(HddsProtos.ReplicationFactor.ONE);
++ RatisReplicationConfig.getInstance(HddsProtos.ReplicationFactor.ONE);
+
+ @Test
+ public void testResolveClientSideRepConfigWhenBucketHasEC()
+ throws OS3Exception {
+ ReplicationConfig replicationConfig = S3Utils
+ .resolveS3ClientSideReplicationConfig(S3StorageType.STANDARD.name(),
+ null, ecReplicationConfig);
+ // Bucket default is EC.
+ Assert.assertEquals(ecReplicationConfig, replicationConfig);
+ }
+
+ /**
+ * When bucket replication is null and it should respect user passed value.
+ */
+ @Test
+ public void testResolveClientSideRepConfigWhenBucketHasNull()
+ throws OS3Exception {
+ ReplicationConfig replicationConfig = S3Utils
+ .resolveS3ClientSideReplicationConfig(S3StorageType.STANDARD.name(),
+ null, null);
+ // Passed replication is 3 - Ozone mapped replication is ratis THREE
+ Assert.assertEquals(ratis3ReplicationConfig, replicationConfig);
+ }
+
+ /**
+ * When bucket replication is null and it should return null if user passed
+ * value is invalid.
+ */
+ @Test
+ public void testResolveClientSideRepConfigWhenUserPassedReplicationIsEmpty()
+ throws OS3Exception {
+ ReplicationConfig replicationConfig =
+ S3Utils.resolveS3ClientSideReplicationConfig("", null, null);
+ // client configured value also null.
+ // This API caller should leave the decision to server.
+ Assert.assertNull(replicationConfig);
+ }
+
+ /**
+ * When bucket default is non-EC and client side values are not valid, we
+ * would just return null, so servers can make decision in this case.
+ */
+ @Test
+ public void testResolveRepConfWhenUserPassedIsInvalidButBucketDefaultNonEC()
+ throws OS3Exception {
+ ReplicationConfig replicationConfig = S3Utils
+ .resolveS3ClientSideReplicationConfig(null, null,
+ ratis3ReplicationConfig);
+ // Configured client config also null.
+ Assert.assertNull(replicationConfig);
+ }
+
+ /**
+ * When bucket default is non-EC and client side value is valid, we
+ * would should return client side valid value.
+ */
+ @Test
+ public void testResolveRepConfWhenUserPassedIsValidButBucketDefaultNonEC()
+ throws OS3Exception {
+ ReplicationConfig replicationConfig = S3Utils
+ .resolveS3ClientSideReplicationConfig(
+ S3StorageType.REDUCED_REDUNDANCY.name(), null,
+ ratis3ReplicationConfig);
+ // Passed value is replication one - Ozone mapped value is ratis ONE
+ Assert.assertEquals(ratis1ReplicationConfig, replicationConfig);
+ }
+
+ /**
+ * When bucket default is EC and client side value also valid, we would just
+ * return bucket default EC.
+ */
+ @Test
+ public void testResolveRepConfWhenUserPassedIsValidButBucketDefaultEC()
+ throws OS3Exception {
+ ReplicationConfig replicationConfig = S3Utils
+ .resolveS3ClientSideReplicationConfig(S3StorageType.STANDARD.name(),
+ ratis3ReplicationConfig, ecReplicationConfig);
+ // Bucket default is EC
+ Assert.assertEquals(ecReplicationConfig, replicationConfig);
+ }
+
+ /**
+ * When bucket default is non-EC and client side passed value also not valid
+ * but configured value is valid, we would just return configured value.
+ */
+ @Test
+ public void testResolveRepConfWhenUserPassedIsInvalidAndBucketDefaultNonEC()
+ throws OS3Exception {
+ ReplicationConfig replicationConfig = S3Utils
+ .resolveS3ClientSideReplicationConfig(null, ratis3ReplicationConfig,
+ ratis1ReplicationConfig);
+ // Configured value is ratis THREE
+ Assert.assertEquals(ratis3ReplicationConfig, replicationConfig);
+ }
+
+ /**
+ * When bucket default is non-EC and client side passed value also not valid
+ * but configured value is valid, we would just return configured value.
+ */
+ @Test(expected = OS3Exception.class)
+ public void testResolveRepConfWhenUserPassedIsInvalid() throws OS3Exception {
+ S3Utils.resolveS3ClientSideReplicationConfig("INVALID",
+ ratis3ReplicationConfig, ratis1ReplicationConfig);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org