You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2019/11/25 07:16:47 UTC

[hadoop-ozone] branch HDDS-1564 updated (07e5348 -> 32af34f)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a change to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git.


    omit 07e5348  HDDS-1569 Support creating multiple pipelines with same datanode. Contributed by Li Cheng.
    omit 042062a  HDDS-2089: Add createPipeline CLI. (#1418)
    omit c82339c  HDDS-1571. Create an interface for pipeline placement policy to support network topologies. (#1395)
    omit ab1c6d6  HDDS-1577. Add default pipeline placement policy implementation. (#1366)
     add e91cc32  HDDS-2272. Avoid buffer copying in GrpcReplicationClient
     add 9857b0c  HDDS-2291. Acceptance tests for OM HA
     add 6b2cda1  HDDS-1515. Create ozone dev-support script to check hadolint violiations
     add d718146  HDDS-2255. Improve Acl Handler Messages (#94)
     add f162c05  HDDS-2321. Ozone Block Token verify should not apply to all datanode … (#110)
     add 9565cc5  HDDS-2359. Seeking randomly in a key with more than 2 blocks of data leads to inconsistent reads (#82)
     add 21e5761  HDDS-2064. Add tests for incorrect OM HA config when node ID or RPC address is not configured (#119)
     add 33b374a  HDDS-1643. Send hostName also part of OMRequest. (#70)
     add a0dd918  HDDS-2407. Reduce log level of per-node failure in XceiverClientGrpc (#120)
     add 615571e  HDDS-2270. Avoid buffer copying in ContainerStateMachine.loadSnapshot/persistContainerSet (#118)
     add 06c70f7  HDDS-2377. Speed up TestOzoneManagerHA#testOMRetryProxy and #testTwoOMNodesDown (#99)
     add 384fee0  HDDS-2369. Fix typo in param description.
     add dc3a3c3  HDDS-2327. Provide new Freon test to test Ratis pipeline with pure XceiverClientRatis
     add d893cd3  HDDS-2370. Support Ozone HddsDatanodeService run as plugin with HDFS Datanode
     add ab7987c  HDDS-2404. Added support for Registered id as service identifier for CSR. Based on the discussion with reviewer, otherName field make more sence then registeredId.
     add 8e00c2e  HDDS-2395. Handle completeMPU scenarios to match with aws s3 behavior. (#109)
     add f2f97ea  HDDS-2399. Update mailing list information. (#126)
     add 6a450fb  HDDS-2427. Exclude webapps from hadoop-ozone-filesystem-lib-current uber jar
     add 9ce2dd7  HDDS-1701. Move dockerbin script to libexec
     add f928a0b  HDDS-2273. Avoid buffer copying in GrpcReplicationService. Contributed by Attila Doroszlai
     add 93d0db4  HDDS-2410. Ozoneperf docker cluster should use privileged containers (#124)
     add f7ba616  HDDS-2417 Add the list trash command to the client side (#138)
     add a6f80c0  HDDS-2325. BenchMarkDatanodeDispatcher genesis test is failing with NPE
     add 3b0a49e  HDDS-2415. Completely disable tracer if hdds.tracing.enabled=false
     add ee8b89a  HDDS-1868. Ozone pipelines should be marked as ready only after the leader election is complete. (#23)
     add 1c1aecf  HDDS-2456. Add explicit base image version for images derived from ozone-runner
     add 1e71b63  HDDS-2462. Add jq dependency in Contribution guideline (#145)
     add 2211d63  HDDS-2460. Default checksum type is wrong in description
     add c199a29  HDDS-2451. Use lazy string evaluation in preconditions
     add 4337bb0  HDDS-2403. Remove leftover reference to OUTPUT_FILE from shellcheck.sh
     add ef2a9f4  HDDS-2445. Replace ToStringBuilder in BlockData
     add d816b64  HDDS-2452: Wrong condition for re-scheduling in ReportPublisher (#151)
     add 1f8c66a  HDDS-2463. Reduce unnecessary getServiceInfo calls. Contributed by Xiaoyu Yao. (#146)
     add fe85445  HDDS-2464. Avoid unnecessary allocations for FileChannel.open call. (#147)
     add 42b6909  HDDS-2458. Avoid list copy in ChecksumData. (#141)
     add 86c76ff  HDDS-2400. Enable github actions based builds for Ozone (#122)
     add cb66b48  HDDS-2412. Define description/topics/merge strategy for the github repository (#125)
     add c83d5bc  HDDS-2364. Add OM metrics to find the false positive rate for the keyMayExist. (#101)
     add e39134d  HDDS-1847: Datanode Kerberos principal and keytab config key looks inconsistent (#115)
     add 659385e  HDDS-2469. Avoid changing client-side key metadata (#154)
     add 528cea3  HDDS-1940. Closing open container via scmcli gives false error message. (#153)
     add e350aef  HDDS-2478. Sonar : remove temporary variable in XceiverClientGrpc sendCommand (#165)
     add 49dbb18  HDDS-2480. Sonar : remove log spam for exceptions inside XceiverClientGrpc reconnect (#170)
     add 287b322  HDDS-2479. Sonar : replace instanceof with catch block in XceiverClientGrpc sendCommandWithRetry (#168)
     add 8a11fe3  HDDS-2450 Datanode ReplicateContainer thread pool should be configurable (#134)
     add d0fd848  HDDS-2473. Fix code reliability issues found by Sonar in Ozone Recon module. (#162)
     add 0d86870  HDDS-2481. Close streams in TarContainerPacker (#167)
     add 89d11ad  HDDS-2034. Async RATIS pipeline creation and destroy through heartbeat commands (#29)
     add 0e403a5  HDDS-2482. Enable github worflow actions full pull requests (#171)
     add 37dc306  HDDS-2494 Sonar BigDecimal Cleanup (#175)
     add 3869296  HDDS-2487. Ensure streams are closed (#173)
     add 3ca37b1  HDDS-2492 Fix test clean up issue in TestSCMPipelineManager. (#179)
     add a1ea8a4  HDDS-2468. scmcli close pipeline command not working
     add d05ff4e  HDDS-2471. Improve exception message for CompleteMultipartUpload
     add aa828b1  HDDS-2470. Add partName, partNumber for CommitMultipartUpload
     add 5264882  HDDS-2495. Sonar - "notify" may not wake up the appropriate thread
     add 43be5c8  HDDS-2472. Use try-with-resources while creating FlushOptions in RDBStore (#161)
     add 2f94fec  Fix sonar warnings in HddsUtils (#184)
     add bd0d873  HDDS-2418 Add the list trash command to the server side handling (#143)
     add 9f57785  HDDS-2375. Refactor BlockOutputStream to allow flexible buffering. (#97)
     add faedad7  HDDS-2500. Avoid fall-through in CloseContainerCommandHandler (#186)
     add baf6cef  HDDS-2515. no need of toString since log formatter can intatnally handles that. (#190)
     add 6fd2ef0  HDDS-2511. Fix Sonar issues in OzoneManagerServiceProviderImpl. (#188)
     add 5d51385  HDDS-2507. Remove the hard-coded exclusion of TestMiniChaosOzoneCluster (#182)
     add ee6f22d  HDDS-2502. Close ScmClient in RatisInsight. (#199)
     add 79e3ff9  HDDS-2405. int2ByteString unnecessary byte array allocation (#197)
     add 493a916  HDDS-2501. Sonar: Fix issues found in the ObjectEndpoint class. (#198)
     add dba972d  HDDS-2513. Removed unused private field (#187)
     add b01c7c9  HDDS-2461. Logging by ChunkUtils is misleading (#144)
     add 1b72718  HDDS-2503. Close FlushOptions in RDBStore (#181)
     add a9a9f4f  HDDS-2489. Change anonymous class based initialization in HddsUtils. (#172)
     add 8af5ab8  HDDS-2525. Sonar : replace lambda with method reference in SCM BufferPool. (#210)
     add 89bdb6a  HDDS-2521. Multipart upload failing with NPE
     add 6be3c84  HDDS-2524. Sonar : clumsy error handling in BlockOutputStream validateResponse. (#209)
     add 4ba4065  HDDS-2526. Sonar : use format specifiers in Log inside HddsConfServlet (#211)
     add b411439  HDDS-2520. Sonar: Avoid temporary variable scmSecurityClient (#208)
     add d52fb26  HDDS-2488. Not enough arguments for log messages in GrpcXceiverService. (#191)
     add 0b5df11  HDDS-2499. IsLeader information is lost when update pipeline state. (#180)
     add ecb5bf4  HDDS-2490. Fixing sonarcloud errors. (#217)
     add 4a4b03c  HDDS-2527. Sonar: remove redundant temporary assignment in HddsVersionProvider (#219)
     add b8819c3  HDDS-2517. Immediately return rather than holding to variable and then returning (#192)
     add 287f9c1  HDDS-2514. removed unused method param (#189)
     add ac59c4f  HDDS-2486. Sonar: Avoid empty test methods (#220)
     add e5a3b0c  HDDS-2509. Code cleanup in replication package (#185)
     add dcfe5f3  Revert "HDDS-2034. Async RATIS pipeline creation and destroy through heartbeat commands (#29)"
     add 87d5a5a  HDDS-2533. Disable failing acceptance and unit tests
     add 814cb72  HDDS-2547. Sonar: remove volatile keyword from BlockOutputStream bloc… (#229)
     add c3b14f6  HDDS-2442. Added support for service name in OM for CSR (#177)
     add 83c66a7  HDDS-2548. Refactored return type as interface rather then impl. (#228)
     add cb7e146  HDDS-2546. Reorder the modifiers to comply (#227)
     add eda4575  HDDS-2544. Refactored for unnecessary toString (#225)
     add 1d66049  HDDS-2543. Refactored for log format. (#224)
     add 6186cf9  HDDS-2448 Delete container command should used a thread pool (#142)
     add da36b09  HDDS-2550. Sonar: OzoneClient should be closed in GetAclKeyHandler (#231)
     add d7ef779  HDDS-2545. Remove empty statement (#226)
     add ba76d23  HDDS-2549. Invoke method(s) only conditionally (#230)
     add b10ac6b  HDDS-2535. TestOzoneManagerDoubleBufferWithOMResponse is flaky. (#216)
     add 8ad88ce  HDDS-2516. Code cleanup in EventQueue (#196)
     add 2fea0af  HDDS-2467. Allow running Freon validators with limited memory (#152)
     add f29de78  HDDS-2247. Delete FileEncryptionInfo from KeyInfo when a Key is deleted (#200)
     add e68b113  HDDS-2493. Sonar: Locking on a parameter in NetUtils.removeOutscope. (#174)
     add 2134dec  HDDS-2485. Disable XML external entity processing
     add 92dcfdb  HDDS-2580. Ensure resources are closed in Get/PutKeyHandler (#235)
     add b1ac520  HDDS-2498. Fix sonar issues found in StorageContainerManager. (#178)
     add e47acd1  HDDS-2512 Sonar TraceAllMethod NPE Could be Thrown (#193)
     add 3c334f6  HDDS-2241. Optimize the refresh pipeline logic used by KeyManagerImpl… (#194)
     add bdc11fc  HDDS-2523. BufferPool.releaseBuffer may release a buffer different than the head of the list (#232)
     add 26ffa6b  HDDS-2522. Fix TestSecureOzoneCluster (#207)
     add e7fd407  HDDS-2598. Remove unused private field "LOG" (#241)
     add fab0242  HDDS-2597. Remove toString() as log calls it implicitly (#240)
     add b557aec  HDDS-2594. S3 RangeReads failing with NumberFormatException. (#242)
     add bcb13d4  HDDS-2538. Fix issues found in DatabaseHelper. (#221)
     add d29838b  HDDS-2536. Add ozone.om.internal.service.id to OM HA configuration. (#218)
     add 7d35218  HDDS-2394. Ozone S3 Gateway allows bucket name with underscore to be created (#243)
     add 571d4a3  HDDS-2605. Use LongSupplier to avoid boxing (#256)
     add a731eea  HDDS-2603. Avoid unnecessary boxing in XceiverClientReply (#255)
     add a2a19f7  HDDS-2300. Publish normalized Ratis metrics via the prometheus endpoint  (#127)
     add 6105e31  HDDS-2587. Enable sonarcloud measurement as part of CI builds (#253)
     add 678e4bc  HDDS-2454. Improve OM HA robot tests. (#136)
     add 46a0875  HDDS-2510. Use isEmpty() to check whether the collection is empty or not in Ozone Manager module. (#258)
     add 2185ac7  HDDS-2591. No tailMap needed for startIndex 0 in ContainerSet#listContainer (#254)
     new 748cb01  HDDS-1577. Add default pipeline placement policy implementation. (#1366)
     new d88e9bf  HDDS-1571. Create an interface for pipeline placement policy to support network topologies. (#1395)
     new 2b9ad2d  HDDS-2089: Add createPipeline CLI. (#1418)
     new 32af34f  HDDS-1569 Support creating multiple pipelines with same datanode. Contributed by Li Cheng.

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (07e5348)
            \
             N -- N -- N   refs/heads/HDDS-1564 (32af34f)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 dev-support/bin/qbt => .asf.yaml                   |  18 +-
 .../buildenv/Dockerfile                            |   8 +-
 .../buildenv/entrypoint.sh                         |   6 +-
 .github/workflows/post-commit.yml                  | 120 +++++++
 .github/workflows/pr.yml                           | 114 ++++++
 CONTRIBUTION.md                                    |   5 +-
 README.md                                          |   2 +-
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |  60 ++--
 .../hadoop/hdds/scm/XceiverClientManager.java      |   2 +-
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |   3 +-
 .../hdds/scm/client/ContainerOperationClient.java  |  76 +---
 .../hadoop/hdds/scm/client/HddsClientUtils.java    |  42 +--
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |  11 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |  53 ++-
 .../apache/hadoop/hdds/scm/storage/BufferPool.java |  21 +-
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  |   6 +-
 .../hadoop/hdds/scm/storage/CommitWatcher.java     |  21 +-
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |  87 ++++-
 .../hadoop/hdds/cli/HddsVersionProvider.java       |   3 +-
 .../org/apache/hadoop/hdds/client/BlockID.java     |  12 +-
 .../hadoop/hdds/client/ContainerBlockID.java       |  13 +-
 .../apache/hadoop/hdds/conf/HddsConfServlet.java   |   2 +-
 .../hadoop/hdds/protocol/SCMSecurityProtocol.java  |   4 +-
 .../hdds/protocolPB/SCMSecurityProtocolPB.java     |   5 +-
 .../hdds/ratis/ContainerCommandRequestMessage.java |   3 +-
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |  12 -
 .../java/org/apache/hadoop/hdds/scm/ScmConfig.java |  76 ++++
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  11 +-
 .../apache/hadoop/hdds/scm/XceiverClientReply.java |  18 +-
 .../apache/hadoop/hdds/scm/XceiverClientSpi.java   |   2 +-
 .../apache/hadoop/hdds/scm/client/ScmClient.java   |   9 -
 .../org/apache/hadoop/hdds/scm/net/NetUtils.java   |  22 +-
 .../hadoop/hdds/scm/net/NetworkTopologyImpl.java   |  10 +-
 .../hadoop/hdds/scm/net/NodeSchemaLoader.java      |  27 +-
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |  55 ++-
 .../scm/protocol/ScmBlockLocationProtocol.java     |   5 +-
 .../protocol/StorageContainerLocationProtocol.java |  23 +-
 .../scm/protocolPB/ScmBlockLocationProtocolPB.java |   4 +-
 ...inerLocationProtocolClientSideTranslatorPB.java |  37 +-
 .../StorageContainerLocationProtocolPB.java        |   4 +-
 .../hdds/security/token/BlockTokenVerifier.java    | 114 +++---
 .../security/token/OzoneBlockTokenIdentifier.java  |   3 +-
 .../hadoop/hdds/security/token/TokenVerifier.java  |  13 +-
 .../authority/PKIProfiles/DefaultProfile.java      |   4 +
 .../certificates/utils/CertificateSignRequest.java |  41 ++-
 .../apache/hadoop/hdds/tracing/StringCodec.java    |  17 +-
 .../apache/hadoop/hdds/tracing/TraceAllMethod.java |   5 +
 .../apache/hadoop/hdds/tracing/TracingUtil.java    |  64 ++--
 .../org/apache/hadoop/hdds/utils/db/DBProfile.java |   2 +-
 .../apache/hadoop/hdds/utils/db/RDBMetrics.java    |  75 ++++
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |  61 ++--
 .../org/apache/hadoop/hdds/utils/db/RDBTable.java  |  18 +-
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   4 +
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   2 +
 .../org/apache/hadoop/ozone/audit/SCMAction.java   |   1 +
 .../org/apache/hadoop/ozone/common/Checksum.java   |  37 +-
 .../apache/hadoop/ozone/common/ChecksumData.java   |  28 +-
 .../apache/hadoop/ozone/common/ChunkBuffer.java    |  87 +++++
 .../common/ChunkBufferImplWithByteBuffer.java      | 108 ++++++
 .../ozone/container/common/helpers/BlockData.java  |  32 +-
 .../java/org/apache/hadoop/ozone/lease/Lease.java  |  46 ++-
 .../hadoop/ozone/lease/LeaseCallbackExecutor.java  |   5 +-
 .../apache/hadoop/ozone/lease/LeaseManager.java    |  20 +-
 .../apache/hadoop/ozone/lease/package-info.java    |   6 +-
 .../proto/StorageContainerLocationProtocol.proto   |  30 +-
 hadoop-hdds/common/src/main/proto/hdds.proto       |  12 +-
 .../common/src/main/resources/ozone-default.xml    |  47 +--
 .../ratis/TestContainerCommandRequestMessage.java  |   8 +-
 .../hadoop/hdds/scm/net/TestNodeSchemaLoader.java  | 151 ++++----
 .../certificate/authority/TestDefaultCAServer.java |   1 +
 .../certificate/authority/TestDefaultProfile.java  |   3 +-
 .../certificates/TestCertificateSignRequest.java   |  33 ++
 .../hadoop/hdds/utils/db/TestRDBTableStore.java    |  53 ++-
 .../hdds/utils/db/TestTypedRDBTableStore.java      |   4 -
 .../hadoop/ozone/common/TestChunkBuffer.java       | 134 +++++++
 .../hadoop/ozone/lease/TestLeaseManager.java       |   7 +-
 .../apache/hadoop/ozone/lease/package-info.java    |   8 +-
 .../src/test/resources/log4j.properties            |   0
 .../{good.xml => external-entity.xml}              |   7 +-
 .../hadoop/hdds/conf/ConfigFileAppender.java       |  21 +-
 .../org/apache/hadoop/hdds/conf/ConfigTag.java     |   3 +-
 .../apache/hadoop/ozone/HddsDatanodeService.java   |  21 +-
 .../container/common/impl/ContainerDataYaml.java   |   5 +-
 .../ozone/container/common/impl/ContainerSet.java  |   2 +-
 .../container/common/impl/HddsDispatcher.java      |  46 ++-
 .../ozone/container/common/interfaces/Handler.java |  60 ++--
 .../common/report/PipelineReportPublisher.java     |   4 +-
 .../container/common/report/ReportPublisher.java   |   2 +-
 .../common/statemachine/DatanodeConfiguration.java |  91 +++++
 .../common/statemachine/DatanodeStateMachine.java  |  16 +-
 .../CloseContainerCommandHandler.java              |   3 +-
 .../commandhandler/CommandDispatcher.java          |   6 +
 .../commandhandler/CommandHandler.java             |   8 +
 .../DeleteContainerCommandHandler.java             |  70 +++-
 .../ReplicateContainerCommandHandler.java          |   4 +-
 .../transport/server/GrpcXceiverService.java       |  24 +-
 .../server/ServerCredentialInterceptor.java        |  74 ----
 .../common/transport/server/XceiverServer.java     |  89 -----
 .../common/transport/server/XceiverServerGrpc.java |  18 +-
 .../server/ratis/ContainerStateMachine.java        |  50 +--
 .../transport/server/ratis/XceiverServerRatis.java |  40 ++-
 .../ozone/container/keyvalue/KeyValueHandler.java  |  12 +-
 .../container/keyvalue/TarContainerPacker.java     | 170 ++++-----
 .../container/keyvalue/helpers/ChunkUtils.java     |  55 +--
 .../keyvalue/impl/ChunkManagerDummyImpl.java       |  11 +-
 .../container/keyvalue/impl/ChunkManagerImpl.java  |   5 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  66 ++--
 .../replication/ContainerStreamingOutput.java      |   5 +-
 .../container/replication/GrpcOutputStream.java    | 129 +++++++
 .../replication/GrpcReplicationClient.java         |  57 +--
 .../replication/GrpcReplicationService.java        |  94 +----
 .../OnDemandContainerReplicationSource.java        |  13 +-
 .../replication/ReplicationSupervisor.java         |  61 +++-
 .../protocol/StorageContainerDatanodeProtocol.java |   5 +-
 .../StorageContainerDatanodeProtocolPB.java        |   4 +-
 .../proto/StorageContainerDatanodeProtocol.proto   |   1 +
 .../container/common/impl/TestContainerSet.java    |  39 ++-
 .../container/common/impl/TestHddsDispatcher.java  |  20 +-
 .../container/common/interfaces/TestHandler.java   |  10 +-
 .../container/keyvalue/TestKeyValueHandler.java    |  10 +-
 .../TestKeyValueHandlerWithUnhealthyContainer.java |   7 +-
 .../container/keyvalue/helpers/TestChunkUtils.java |  19 +
 .../replication/TestGrpcOutputStream.java          | 213 ++++++++++++
 .../replication/TestReplicationSupervisor.java     | 218 +++++++++---
 hadoop-hdds/docs/content/beyond/RunningWithHDFS.md |   4 +-
 hadoop-hdds/framework/pom.xml                      |  13 +-
 .../hadoop/hdds/server/PrometheusServlet.java      |  13 +-
 .../hadoop/hdds/server/RatisDropwizardExports.java |  31 +-
 .../hdds/server/RatisNameRewriteSampleBuilder.java | 104 ++++++
 .../hadoop/hdds/server/events/EventExecutor.java   |   2 +-
 .../hadoop/hdds/server/events/EventHandler.java    |   4 +-
 .../hadoop/hdds/server/events/EventPublisher.java  |   2 +-
 .../hadoop/hdds/server/events/EventQueue.java      |  13 +-
 .../hadoop/hdds/server/events/EventWatcher.java    |  46 +--
 .../hdds/server/events/EventWatcherMetrics.java    |  11 +-
 .../hdds/server/events/SingleThreadExecutor.java   |  36 +-
 .../hadoop/hdds/server/events/TypedEvent.java      |   4 +-
 .../hadoop/hdds/server/events/package-info.java    |   7 +-
 .../hdds/server/TestRatisDropwizardExports.java    |  66 ++++
 .../hadoop/hdds/server/TestRatisNameRewrite.java   | 107 ++++++
 .../hdds/server/events/EventHandlerStub.java       |   4 +-
 hadoop-hdds/pom.xml                                |   3 +-
 .../hdds/scm/container/ReplicationManager.java     |   4 +-
 .../hdds/scm/pipeline/PipelineReportHandler.java   |  50 +--
 .../hdds/scm/pipeline/PipelineStateManager.java    |   8 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |  13 +-
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |   2 +-
 ...inerLocationProtocolServerSideTranslatorPB.java |  35 +-
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |  28 +-
 .../hadoop/hdds/scm/safemode/SafeModeHandler.java  |   5 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  49 ++-
 .../hdds/scm/server/SCMHTTPServerConfig.java       |  82 +++++
 .../hdds/scm/server/SCMSecurityProtocolServer.java |   3 +-
 .../hdds/scm/server/StorageContainerManager.java   |  88 +++--
 .../server/StorageContainerManagerHttpServer.java  |   6 +-
 .../scm/server/StorageContainerManagerStarter.java |   5 +-
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java |  14 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |   2 +
 .../scm/pipeline/MockRatisPipelineProvider.java    |  15 +
 .../safemode/TestHealthyPipelineSafeModeRule.java  |   3 +-
 .../TestOneReplicaPipelineSafeModeRule.java        |   3 +-
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |   6 +-
 .../ozone/container/replication/package-info.java  |  18 -
 .../apache/hadoop/ozone/client/ObjectStore.java    |   2 +-
 .../hadoop/ozone/client/io/KeyInputStream.java     |  13 +-
 .../ozone/client/protocol/ClientProtocol.java      |  22 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  52 ++-
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |  24 +-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   3 +
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |  18 +
 .../hadoop/ozone/om/exceptions/OMException.java    |  12 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  11 +
 .../om/helpers/OmMultipartUploadCompleteList.java  |   8 +-
 .../ozone/om/protocol/OzoneManagerProtocol.java    |  21 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  44 ++-
 .../src/main/proto/OzoneManagerProtocol.proto      |  27 ++
 .../checks/{isolation.sh => hadolint.sh}           |  28 +-
 hadoop-ozone/dev-support/checks/integration.sh     |   3 +-
 hadoop-ozone/dev-support/checks/shellcheck.sh      |   1 -
 .../dev-support/checks/{build.sh => sonar.sh}      |  10 +-
 hadoop-ozone/dev-support/checks/unit.sh            |   2 +-
 .../dist/dev-support/bin/dist-layout-stitching     |   4 +-
 hadoop-ozone/dist/pom.xml                          |   2 +-
 .../main/compose/ozone-hdfs/docker-compose.yaml    |   2 +-
 .../dist/src/main/compose/ozone-hdfs/docker-config |   1 -
 .../.ssh/authorized_keys                           |   0
 .../{ozonescripts => ozone-om-ha}/.ssh/config      |   0
 .../{ozonescripts => ozone-om-ha}/.ssh/environment |   0
 .../{ozonescripts => ozone-om-ha}/.ssh/id_rsa      |   0
 .../{ozonescripts => ozone-om-ha}/.ssh/id_rsa.pub  |   0
 .../{ozonescripts => ozone-om-ha}/Dockerfile       |  19 +-
 .../main/compose/ozone-om-ha/docker-compose.yaml   |  40 ++-
 .../src/main/compose/ozone-om-ha/docker-config     |  58 +++-
 .../{ozones3/test.sh => ozone-om-ha/run.sh}        |  14 +-
 .../{ozonescripts/ps.sh => ozone-om-ha/startOM.sh} |  13 +-
 .../stop.sh => ozone-om-ha/stopOM.sh}              |   5 +-
 .../src/main/compose/ozoneperf/docker-compose.yaml |   4 +
 .../dist/src/main/compose/ozonescripts/Dockerfile  |   6 +-
 .../dist/src/main/compose/ozonescripts/README.md   |   6 +-
 .../main/compose/ozonescripts/docker-compose.yaml  |  18 +-
 hadoop-ozone/dist/src/main/docker/Dockerfile       |   2 +
 .../{dockerbin => dockerlibexec}/entrypoint.sh     |   0
 .../main/{dockerbin => dockerlibexec}/envtoconf.py |   0
 .../{dockerbin => dockerlibexec}/transformation.py |   0
 .../src/main/smoketest/basic/ozone-shell.robot     |   6 +-
 .../dist/src/main/smoketest/omha/testOMHA.robot    | 182 ++++++++++
 .../dist/src/main/smoketest/ozonefs/ozonefs.robot  |  14 +-
 .../src/main/smoketest/s3/MultipartUpload.robot    |  31 +-
 .../dist/src/main/smoketest/s3/bucketcreate.robot  |   4 +
 .../apache/hadoop/ozone/insight/LogSubcommand.java |   2 +-
 .../ozone/insight/datanode/RatisInsight.java       |  32 +-
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  | 147 ++++----
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  58 +++-
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |   6 +-
 .../TestContainerStateMachineIdempotency.java      |   2 +-
 .../hadoop/ozone/TestSecureOzoneCluster.java       | 385 +++++++++------------
 .../hadoop/ozone/client/rpc/TestCommitWatcher.java |  23 +-
 .../rpc/TestContainerStateMachineFailures.java     |   2 +-
 .../ozone/client/rpc/TestKeyInputStream.java       | 104 +++++-
 .../client/rpc/TestOzoneAtRestEncryption.java      |  82 +++++
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  11 +-
 .../ozone/container/ContainerTestHelper.java       |  34 +-
 .../ozone/container/TestContainerReplication.java  |   2 +-
 .../container/common/helpers/TestBlockData.java    |  12 +
 .../transport/server/ratis/TestCSMMetrics.java     |   2 +-
 .../container/metrics/TestContainerMetrics.java    |  15 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |   2 +-
 .../container/server/TestContainerServer.java      |   9 +-
 .../server/TestSecureContainerServer.java          | 192 +++++-----
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java | 133 ++++++-
 .../ozone/om/TestOzoneManagerConfiguration.java    |  51 +++
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java |  38 +-
 .../scm/TestGetCommittedBlockLengthAndPutKey.java  |   4 +-
 .../ozone/scm/pipeline/TestSCMPipelineMetrics.java |   3 +
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  |   2 +-
 .../java/org/apache/hadoop/ozone/om/IOzoneAcl.java |   2 +-
 .../apache/hadoop/ozone/om/KeyDeletingService.java |   2 +-
 .../org/apache/hadoop/ozone/om/KeyManager.java     |  19 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 212 ++++--------
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |   9 +
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  54 ++-
 .../hadoop/ozone/om/OzoneManagerStarter.java       |   3 +-
 .../apache/hadoop/ozone/om/PrefixManagerImpl.java  |   2 +-
 .../apache/hadoop/ozone/om/VolumeManagerImpl.java  |   2 +-
 .../apache/hadoop/ozone/om/ha/OMHANodeDetails.java |  21 +-
 .../hadoop/ozone/om/request/OMClientRequest.java   |  18 +-
 .../S3MultipartUploadCommitPartRequest.java        |  23 +-
 .../S3MultipartUploadCompleteRequest.java          | 241 +++++++------
 .../S3MultipartUploadCompleteResponse.java         |  29 +-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |   2 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |  36 +-
 .../hadoop/ozone/web/ozShell/OzoneShell.java       |   2 +-
 .../web/ozShell/bucket/AddAclBucketHandler.java    |  40 ++-
 .../web/ozShell/bucket/CreateBucketHandler.java    |  64 ++--
 .../web/ozShell/bucket/DeleteBucketHandler.java    |  20 +-
 .../web/ozShell/bucket/GetAclBucketHandler.java    |  37 +-
 .../web/ozShell/bucket/InfoBucketHandler.java      |  22 +-
 .../web/ozShell/bucket/ListBucketHandler.java      |  44 +--
 .../web/ozShell/bucket/RemoveAclBucketHandler.java |  41 ++-
 .../ozone/web/ozShell/bucket/S3BucketMapping.java  |  26 +-
 .../web/ozShell/bucket/SetAclBucketHandler.java    |  37 +-
 .../ozone/web/ozShell/keys/AddAclKeyHandler.java   |  47 +--
 .../ozone/web/ozShell/keys/DeleteKeyHandler.java   |  25 +-
 .../ozone/web/ozShell/keys/GetAclKeyHandler.java   |  44 +--
 .../ozone/web/ozShell/keys/GetKeyHandler.java      |  80 +++--
 .../ozone/web/ozShell/keys/InfoKeyHandler.java     |  39 ++-
 .../ozone/web/ozShell/keys/ListKeyHandler.java     |  76 ++--
 .../ozone/web/ozShell/keys/PutKeyHandler.java      |  92 ++---
 .../web/ozShell/keys/RemoveAclKeyHandler.java      |  47 +--
 .../ozone/web/ozShell/keys/RenameKeyHandler.java   |  26 +-
 .../ozone/web/ozShell/keys/SetAclKeyHandler.java   |  41 ++-
 .../ozone/web/ozShell/s3/GetS3SecretHandler.java   |  25 +-
 .../hadoop/ozone/web/ozShell/s3/S3Shell.java       |   5 +-
 .../web/ozShell/token/CancelTokenHandler.java      |  34 +-
 .../ozone/web/ozShell/token/GetTokenHandler.java   |  38 +-
 .../ozone/web/ozShell/token/RenewTokenHandler.java |  38 +-
 .../web/ozShell/volume/AddAclVolumeHandler.java    |  35 +-
 .../web/ozShell/volume/CreateVolumeHandler.java    |  55 +--
 .../web/ozShell/volume/DeleteVolumeHandler.java    |  15 +-
 .../web/ozShell/volume/GetAclVolumeHandler.java    |  30 +-
 .../web/ozShell/volume/InfoVolumeHandler.java      |  11 +-
 .../web/ozShell/volume/ListVolumeHandler.java      |  62 ++--
 .../web/ozShell/volume/RemoveAclVolumeHandler.java |  35 +-
 .../web/ozShell/volume/SetAclVolumeHandler.java    |  43 ++-
 .../web/ozShell/volume/UpdateVolumeHandler.java    |  22 +-
 ...TestOzoneManagerDoubleBufferWithOMResponse.java |  13 +-
 .../request/TestOMClientRequestWithUserInfo.java   |  13 +-
 .../security/TestOzoneBlockTokenSecretManager.java |  48 +++
 hadoop-ozone/ozonefs-lib-current/pom.xml           |  15 +
 hadoop-ozone/pom.xml                               |   3 +-
 .../org/apache/hadoop/ozone/recon/ReconServer.java |   3 +
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  |  22 +-
 .../ozone/recon/api/ContainerKeyService.java       |  22 +-
 .../hadoop/ozone/recon/api/types/KeyMetadata.java  |   6 +-
 .../recon/spi/OzoneManagerServiceProvider.java     |   2 +-
 .../spi/impl/OzoneManagerServiceProviderImpl.java  |  30 +-
 .../hadoop/ozone/s3/endpoint/BucketEndpoint.java   |  20 +-
 .../endpoint/MultiDeleteRequestUnmarshaller.java   |   8 +-
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   | 244 +++++++------
 .../hadoop/ozone/s3/exception/S3ErrorTable.java    |   4 +
 .../ozone/s3/util/RangeHeaderParserUtil.java       |   4 +-
 .../hadoop/ozone/client/OzoneBucketStub.java       |  26 +-
 .../hadoop/ozone/client/OzoneOutputStreamStub.java |   8 +-
 .../s3/endpoint/TestMultipartUploadComplete.java   |   8 +-
 .../ozone/s3/util/TestRangeHeaderParserUtil.java   |   6 +-
 .../ozone/audit/parser/common/DatabaseHelper.java  | 157 ++++-----
 .../audit/parser/handler/LoadCommandHandler.java   |  11 +-
 .../audit/parser/handler/QueryCommandHandler.java  |  11 +-
 .../parser/handler/TemplateCommandHandler.java     |  11 +-
 .../hadoop/ozone/freon/BaseFreonGenerator.java     |  38 +-
 .../hadoop/ozone/freon/ContentGenerator.java       |   6 +-
 .../hadoop/ozone/freon/DatanodeChunkGenerator.java | 190 ++++++++++
 .../java/org/apache/hadoop/ozone/freon/Freon.java  |  12 +-
 .../ozone/freon/OzoneClientKeyGenerator.java       |  15 +-
 .../ozone/freon/OzoneClientKeyValidator.java       |  93 ++++-
 .../org/apache/hadoop/ozone/freon/ProgressBar.java |  33 +-
 .../hadoop/ozone/freon/RandomKeyGenerator.java     |   4 +-
 .../apache/hadoop/ozone/freon/SameKeyReader.java   |  65 +---
 .../ozone/genesis/BenchMarkDatanodeDispatcher.java |  84 ++---
 .../ozone/genesis/BenchmarkBlockDataToString.java  | 166 +++++++++
 .../org/apache/hadoop/ozone/genesis/Genesis.java   |   5 +-
 .../apache/hadoop/ozone/freon/TestProgressBar.java |  24 +-
 pom.xml                                            |  21 +-
 323 files changed, 7009 insertions(+), 3593 deletions(-)
 copy dev-support/bin/qbt => .asf.yaml (71%)
 mode change 100755 => 100644
 copy hadoop-hdds/container-service/src/main/resources/META-INF/services/com.sun.jersey.spi.container.ContainerProvider => .github/buildenv/Dockerfile (83%)
 copy hadoop-hdds/container-service/src/main/resources/META-INF/services/com.sun.jersey.spi.container.ContainerProvider => .github/buildenv/entrypoint.sh (85%)
 mode change 100644 => 100755
 create mode 100644 .github/workflows/post-commit.yml
 create mode 100644 .github/workflows/pr.yml
 create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfig.java
 create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/RDBMetrics.java
 create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
 create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBuffer.java
 create mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBuffer.java
 copy hadoop-hdds/{container-service => common}/src/test/resources/log4j.properties (100%)
 copy hadoop-hdds/common/src/test/resources/networkTopologyTestFiles/{good.xml => external-entity.xml} (93%)
 create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
 delete mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ServerCredentialInterceptor.java
 delete mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
 create mode 100644 hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/GrpcOutputStream.java
 create mode 100644 hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestGrpcOutputStream.java
 copy hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/WithMetadata.java => hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/RatisDropwizardExports.java (58%)
 create mode 100644 hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/RatisNameRewriteSampleBuilder.java
 create mode 100644 hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/TestRatisDropwizardExports.java
 create mode 100644 hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/TestRatisNameRewrite.java
 create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMHTTPServerConfig.java
 delete mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/replication/package-info.java
 copy hadoop-ozone/dev-support/checks/{isolation.sh => hadolint.sh} (70%)
 copy hadoop-ozone/dev-support/checks/{build.sh => sonar.sh} (68%)
 copy hadoop-ozone/dist/src/main/compose/{ozonescripts => ozone-om-ha}/.ssh/authorized_keys (100%)
 copy hadoop-ozone/dist/src/main/compose/{ozonescripts => ozone-om-ha}/.ssh/config (100%)
 copy hadoop-ozone/dist/src/main/compose/{ozonescripts => ozone-om-ha}/.ssh/environment (100%)
 copy hadoop-ozone/dist/src/main/compose/{ozonescripts => ozone-om-ha}/.ssh/id_rsa (100%)
 copy hadoop-ozone/dist/src/main/compose/{ozonescripts => ozone-om-ha}/.ssh/id_rsa.pub (100%)
 copy hadoop-ozone/dist/src/main/compose/{ozonescripts => ozone-om-ha}/Dockerfile (77%)
 copy hadoop-ozone/dist/src/main/compose/{ozones3/test.sh => ozone-om-ha/run.sh} (73%)
 mode change 100755 => 100644
 copy hadoop-ozone/dist/src/main/compose/{ozonescripts/ps.sh => ozone-om-ha/startOM.sh} (79%)
 copy hadoop-ozone/dist/src/main/compose/{ozonescripts/stop.sh => ozone-om-ha/stopOM.sh} (93%)
 rename hadoop-ozone/dist/src/main/{dockerbin => dockerlibexec}/entrypoint.sh (100%)
 rename hadoop-ozone/dist/src/main/{dockerbin => dockerlibexec}/envtoconf.py (100%)
 rename hadoop-ozone/dist/src/main/{dockerbin => dockerlibexec}/transformation.py (100%)
 create mode 100644 hadoop-ozone/dist/src/main/smoketest/omha/testOMHA.robot
 create mode 100644 hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
 create mode 100644 hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchmarkBlockDataToString.java


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


[hadoop-ozone] 02/04: HDDS-1571. Create an interface for pipeline placement policy to support network topologies. (#1395)

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit d88e9bf23a4d7caa15a9d05fa06cd5a8d956744b
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Tue Sep 10 20:15:51 2019 +0800

    HDDS-1571. Create an interface for pipeline placement policy to support network topologies. (#1395)
    
    (cherry picked from commit 753fc6703a39154ed6013e44dbae572391748906)
---
 ...erPlacementPolicy.java => PlacementPolicy.java} | 12 +++----
 .../placement/algorithms/package-info.java         | 21 -----------
 .../common/src/main/resources/ozone-default.xml    |  6 ++--
 ...onPolicy.java => SCMCommonPlacementPolicy.java} | 23 ++++++------
 .../hdds/scm/container/ReplicationManager.java     | 13 +++----
 .../ContainerPlacementPolicyFactory.java           | 18 +++++-----
 .../algorithms/SCMContainerPlacementCapacity.java  |  4 ++-
 .../algorithms/SCMContainerPlacementRackAware.java | 12 +++----
 .../algorithms/SCMContainerPlacementRandom.java    |  6 ++--
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 42 +++++++++++++---------
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 14 ++++----
 .../hdds/scm/server/StorageContainerManager.java   |  4 +--
 .../hdds/scm/container/TestReplicationManager.java |  7 ++--
 .../algorithms/TestContainerPlacementFactory.java  |  7 ++--
 .../hdds/scm/node/TestContainerPlacement.java      |  5 ++-
 .../hdds/scm/safemode/TestSafeModeHandler.java     |  5 ++-
 .../hadoop/ozone/TestContainerOperations.java      |  5 ++-
 .../TestContainerStateMachineIdempotency.java      |  5 ++-
 .../hadoop/ozone/dn/scrubber/TestDataScrubber.java |  4 +--
 .../hadoop/ozone/scm/TestContainerSmallFile.java   |  4 +--
 .../scm/TestGetCommittedBlockLengthAndPutKey.java  |  5 ++-
 21 files changed, 106 insertions(+), 116 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java
similarity index 80%
rename from hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
rename to hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java
index 52ce796..f6a0e8b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicy.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/PlacementPolicy.java
@@ -15,7 +15,7 @@
  * the License.
  */
 
-package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+package org.apache.hadoop.hdds.scm;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 
@@ -23,14 +23,14 @@ import java.io.IOException;
 import java.util.List;
 
 /**
- * A ContainerPlacementPolicy support choosing datanodes to build replication
- * pipeline with specified constraints.
+ * A PlacementPolicy support choosing datanodes to build
+ * pipelines or containers with specified constraints.
  */
-public interface ContainerPlacementPolicy {
+public interface PlacementPolicy {
 
   /**
-   * Given the replication factor and size required, return set of datanodes
-   * that satisfy the nodes and size requirement.
+   * Given an initial set of datanodes and the size required,
+   * return set of datanodes that satisfy the nodes and size requirement.
    *
    * @param excludedNodes - list of nodes to be excluded.
    * @param favoredNodes - list of nodes preferred.
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java
deleted file mode 100644
index dac4752..0000000
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.container.placement.algorithms;
-/**
- Contains container placement policy interface definition.
- **/
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 94e8557..2ad9948 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -826,9 +826,11 @@
     </value>
     <tag>OZONE, MANAGEMENT</tag>
     <description>
-      The full name of class which implements org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy.
+      The full name of class which implements
+      org.apache.hadoop.hdds.scm.PlacementPolicy.
       The class decides which datanode will be used to host the container replica. If not set,
-      org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom will be used as default value.
+      org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom will be used as default
+      value.
     </description>
   </property>
   <property>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
similarity index 90%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
index 77cdd83..25457f7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMCommonPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/SCMCommonPlacementPolicy.java
@@ -15,7 +15,7 @@
  * the License.
  */
 
-package org.apache.hadoop.hdds.scm.container.placement.algorithms;
+package org.apache.hadoop.hdds.scm;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
@@ -33,25 +33,25 @@ import java.util.Random;
 import java.util.stream.Collectors;
 
 /**
- * SCM CommonPolicy implements a set of invariants which are common
- * for all container placement policies, acts as the repository of helper
+ * This policy implements a set of invariants which are common
+ * for all basic placement policies, acts as the repository of helper
  * functions which are common to placement policies.
  */
-public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
+public abstract class SCMCommonPlacementPolicy implements PlacementPolicy {
   @VisibleForTesting
   static final Logger LOG =
-      LoggerFactory.getLogger(SCMCommonPolicy.class);
+      LoggerFactory.getLogger(SCMCommonPlacementPolicy.class);
   private final NodeManager nodeManager;
   private final Random rand;
   private final Configuration conf;
 
   /**
-   * Constructs SCM Common Policy Class.
+   * Constructor.
    *
    * @param nodeManager NodeManager
    * @param conf Configuration class.
    */
-  public SCMCommonPolicy(NodeManager nodeManager, Configuration conf) {
+  public SCMCommonPlacementPolicy(NodeManager nodeManager, Configuration conf) {
     this.nodeManager = nodeManager;
     this.rand = new Random();
     this.conf = conf;
@@ -85,7 +85,7 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
   }
 
   /**
-   * Given the replication factor and size required, return set of datanodes
+   * Given size required, return set of datanodes
    * that satisfy the nodes and size requirement.
    * <p>
    * Here are some invariants of container placement.
@@ -149,7 +149,7 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
    * @param datanodeDetails DatanodeDetails
    * @return true if we have enough space.
    */
-  boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
+  public boolean hasEnoughSpace(DatanodeDetails datanodeDetails,
       long sizeRequired) {
     SCMNodeMetric nodeMetric = nodeManager.getNodeStat(datanodeDetails);
     return (nodeMetric != null) && (nodeMetric.get() != null)
@@ -164,7 +164,7 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
    * @param nodesRequired - Nodes Required
    * @param healthyNodes - List of Nodes in the result set.
    * @return List of Datanodes that can be used for placement.
-   * @throws SCMException
+   * @throws SCMException SCMException
    */
   public List<DatanodeDetails> getResultSet(
       int nodesRequired, List<DatanodeDetails> healthyNodes)
@@ -190,8 +190,7 @@ public abstract class SCMCommonPolicy implements ContainerPlacementPolicy {
 
   /**
    * Choose a datanode according to the policy, this function is implemented
-   * by the actual policy class. For example, PlacementCapacity or
-   * PlacementRandom.
+   * by the actual policy class.
    *
    * @param healthyNodes - Set of healthy nodes we can choose from.
    * @return DatanodeDetails
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 37afd36..24f4ef2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -38,8 +38,9 @@ import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigType;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.metrics2.MetricsCollector;
@@ -85,7 +86,7 @@ public class ReplicationManager implements MetricsSource {
    * PlacementPolicy which is used to identify where a container
    * should be replicated.
    */
-  private final ContainerPlacementPolicy containerPlacement;
+  private final PlacementPolicy containerPlacement;
 
   /**
    * EventPublisher to fire Replicate and Delete container events.
@@ -131,12 +132,12 @@ public class ReplicationManager implements MetricsSource {
    *
    * @param conf OzoneConfiguration
    * @param containerManager ContainerManager
-   * @param containerPlacement ContainerPlacementPolicy
+   * @param containerPlacement PlacementPolicy
    * @param eventPublisher EventPublisher
    */
   public ReplicationManager(final ReplicationManagerConfiguration conf,
                             final ContainerManager containerManager,
-                            final ContainerPlacementPolicy containerPlacement,
+                            final PlacementPolicy containerPlacement,
                             final EventPublisher eventPublisher,
                             final LockManager<ContainerID> lockManager) {
     this.containerManager = containerManager;
@@ -474,7 +475,7 @@ public class ReplicationManager implements MetricsSource {
 
   /**
    * If the given container is under replicated, identify a new set of
-   * datanode(s) to replicate the container using ContainerPlacementPolicy
+   * datanode(s) to replicate the container using PlacementPolicy
    * and send replicate container command to the identified datanode(s).
    *
    * @param container ContainerInfo
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
index 18ec2c3..adaeb87 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdds.scm.container.placement.algorithms;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -34,22 +35,23 @@ public final class ContainerPlacementPolicyFactory {
   private static final Logger LOG =
       LoggerFactory.getLogger(ContainerPlacementPolicyFactory.class);
 
-  private static final Class<? extends ContainerPlacementPolicy>
+  private static final Class<? extends PlacementPolicy>
       OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT =
       SCMContainerPlacementRandom.class;
 
   private ContainerPlacementPolicyFactory() {
   }
 
-  public static ContainerPlacementPolicy getPolicy(Configuration conf,
-      final NodeManager nodeManager, NetworkTopology clusterMap,
-      final boolean fallback, SCMContainerPlacementMetrics metrics)
-      throws SCMException{
-    final Class<? extends ContainerPlacementPolicy> placementClass = conf
+
+  public static PlacementPolicy getPolicy(Configuration conf,
+    final NodeManager nodeManager, NetworkTopology clusterMap,
+    final boolean fallback, SCMContainerPlacementMetrics metrics)
+    throws SCMException{
+    final Class<? extends PlacementPolicy> placementClass = conf
         .getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
             OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
-            ContainerPlacementPolicy.class);
-    Constructor<? extends ContainerPlacementPolicy> constructor;
+            PlacementPolicy.class);
+    Constructor<? extends PlacementPolicy> constructor;
     try {
       constructor = placementClass.getDeclaredConstructor(NodeManager.class,
           Configuration.class, NetworkTopology.class, boolean.class,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
index 85d281c..1909344 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementCapacity.java
@@ -21,6 +21,7 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -65,7 +66,8 @@ import org.slf4j.LoggerFactory;
  * little or no work and the cluster will achieve a balanced distribution
  * over time.
  */
-public final class SCMContainerPlacementCapacity extends SCMCommonPolicy {
+public final class SCMContainerPlacementCapacity
+    extends SCMCommonPlacementPolicy {
   @VisibleForTesting
   static final Logger LOG =
       LoggerFactory.getLogger(SCMContainerPlacementCapacity.class);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
index 6d49459..8933fe9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRackAware.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NetConstants;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -45,7 +46,8 @@ import java.util.List;
  * recommend to use this if the network topology has more layers.
  * <p>
  */
-public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
+public final class SCMContainerPlacementRackAware
+    extends SCMCommonPlacementPolicy {
   @VisibleForTesting
   static final Logger LOG =
       LoggerFactory.getLogger(SCMContainerPlacementRackAware.class);
@@ -271,11 +273,9 @@ public final class SCMContainerPlacementRackAware extends SCMCommonPolicy {
         throw new SCMException("No satisfied datanode to meet the" +
             " excludedNodes and affinityNode constrains.", null);
       }
-      if (hasEnoughSpace((DatanodeDetails)node, sizeRequired)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Datanode {} is chosen for container. Required size is {}",
-              node.toString(), sizeRequired);
-        }
+      if (super.hasEnoughSpace((DatanodeDetails)node, sizeRequired)) {
+        LOG.debug("Datanode {} is chosen. Required size is {}",
+            node.toString(), sizeRequired);
         metrics.incrDatanodeChooseSuccessCount();
         if (isFallbacked) {
           metrics.incrDatanodeChooseFallbackCount();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
index 6b1a5c8..ce5d10d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementRandom.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hdds.scm.container.placement.algorithms;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -37,8 +39,8 @@ import java.util.List;
  * Balancer will need to support containers as a feature before this class
  * can be practically used.
  */
-public final class SCMContainerPlacementRandom extends SCMCommonPolicy
-    implements ContainerPlacementPolicy {
+public final class SCMContainerPlacementRandom extends SCMCommonPlacementPolicy
+    implements PlacementPolicy {
   @VisibleForTesting
   static final Logger LOG =
       LoggerFactory.getLogger(SCMContainerPlacementRandom.class);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index cb9954d..1983ed6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMCommonPolicy;
+import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -47,7 +47,7 @@ import java.util.stream.Collectors;
  * 3. Choose an anchor node among the viable nodes.
  * 4. Choose other nodes around the anchor node based on network topology
  */
-public final class PipelinePlacementPolicy extends SCMCommonPolicy {
+public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
   @VisibleForTesting
   static final Logger LOG =
       LoggerFactory.getLogger(PipelinePlacementPolicy.class);
@@ -150,33 +150,41 @@ public final class PipelinePlacementPolicy extends SCMCommonPolicy {
   public List<DatanodeDetails> chooseDatanodes(
       List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
       int nodesRequired, final long sizeRequired) throws SCMException {
-    // get a list of viable nodes based on criteria
+    // Get a list of viable nodes based on criteria
+    // and make sure excludedNodes are excluded from list.
     List<DatanodeDetails> healthyNodes =
         filterViableNodes(excludedNodes, nodesRequired);
-
-    List<DatanodeDetails> results = new ArrayList<>();
-
+    
     // Randomly picks nodes when all nodes are equal.
     // This happens when network topology is absent or
     // all nodes are on the same rack.
     if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
       LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
           "Required nodes: {}", nodesRequired);
-      results = super.getResultSet(nodesRequired, healthyNodes);
-      if (results.size() < nodesRequired) {
-        LOG.error("Unable to find the required number of healthy nodes that " +
-                "meet the criteria. Required nodes: {}, Found nodes: {}",
-            nodesRequired, results.size());
-        throw new SCMException("Unable to find required number of nodes.",
-            SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
-      }
-      return results;
+      return super.getResultSet(nodesRequired, healthyNodes);
+    } else {
+      // Since topology and rack awareness are available, picks nodes
+      // based on them.
+      return this.getResultSet(nodesRequired, healthyNodes);
     }
+  }
 
+  /**
+   * Get result set based on the pipeline placement algorithm which considers
+   * network topology and rack awareness.
+   * @param nodesRequired - Nodes Required
+   * @param healthyNodes - List of Nodes in the result set.
+   * @return a list of datanodes
+   * @throws SCMException SCMException
+   */
+  @Override
+  public List<DatanodeDetails> getResultSet(
+      int nodesRequired, List<DatanodeDetails> healthyNodes)
+      throws SCMException {
+    List <DatanodeDetails> results = new ArrayList<>(nodesRequired);
     // Since nodes are widely distributed, the results should be selected
     // base on distance in topology, rack awareness and load balancing.
     List<DatanodeDetails> exclude = new ArrayList<>();
-    exclude.addAll(excludedNodes);
     // First choose an anchor nodes randomly
     DatanodeDetails anchor = chooseNode(healthyNodes);
     if (anchor == null) {
@@ -193,7 +201,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPolicy {
 
     // Choose the second node on different racks from anchor.
     DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness(
-        healthyNodes, excludedNodes,
+        healthyNodes, exclude,
         nodeManager.getClusterNetworkTopologyMap(), anchor);
     if (nodeOnDifferentRack == null) {
       LOG.error("Unable to find nodes on different racks that " +
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 94443dd..913a435 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
@@ -103,16 +103,16 @@ public class RatisPipelineProvider implements PipelineProvider {
    * @return SCM container placement policy implementation instance.
    */
   @SuppressWarnings("unchecked")
-  // TODO: should we rename ContainerPlacementPolicy to PipelinePlacementPolicy?
-  private static ContainerPlacementPolicy createContainerPlacementPolicy(
+  // TODO: should we rename PlacementPolicy to PipelinePlacementPolicy?
+  private static PlacementPolicy createContainerPlacementPolicy(
       final NodeManager nodeManager, final Configuration conf) {
-    Class<? extends ContainerPlacementPolicy> implClass =
-        (Class<? extends ContainerPlacementPolicy>) conf.getClass(
+    Class<? extends PlacementPolicy> implClass =
+        (Class<? extends PlacementPolicy>) conf.getClass(
             ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
             SCMContainerPlacementRandom.class);
 
     try {
-      Constructor<? extends ContainerPlacementPolicy> ctor =
+      Constructor<? extends PlacementPolicy> ctor =
           implClass.getDeclaredConstructor(NodeManager.class,
               Configuration.class);
       return ctor.newInstance(nodeManager, conf);
@@ -125,7 +125,7 @@ public class RatisPipelineProvider implements PipelineProvider {
 //      LOG.error("Unhandled exception occurred, Placement policy will not " +
 //          "be functional.");
       throw new IllegalArgumentException("Unable to load " +
-          "ContainerPlacementPolicy", e);
+          "PlacementPolicy", e);
     }
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 0249d7e..dad7b23 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -58,7 +58,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
 import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager;
@@ -392,7 +392,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
 
     SCMContainerPlacementMetrics placementMetrics =
         SCMContainerPlacementMetrics.create();
-    ContainerPlacementPolicy containerPlacementPolicy =
+    PlacementPolicy containerPlacementPolicy =
         ContainerPlacementPolicyFactory.getPolicy(conf, scmNodeManager,
             clusterMap, true, placementMetrics);
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index 1631447..63735f7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -27,8 +27,7 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.server.events.EventHandler;
@@ -66,7 +65,7 @@ public class TestReplicationManager {
 
   private ReplicationManager replicationManager;
   private ContainerStateManager containerStateManager;
-  private ContainerPlacementPolicy containerPlacementPolicy;
+  private PlacementPolicy containerPlacementPolicy;
   private EventQueue eventQueue;
   private DatanodeCommandHandler datanodeCommandHandler;
 
@@ -93,7 +92,7 @@ public class TestReplicationManager {
         .thenAnswer(invocation -> containerStateManager
             .getContainerReplicas((ContainerID)invocation.getArguments()[0]));
 
-    containerPlacementPolicy = Mockito.mock(ContainerPlacementPolicy.class);
+    containerPlacementPolicy = Mockito.mock(PlacementPolicy.class);
 
     Mockito.when(containerPlacementPolicy.chooseDatanodes(
         Mockito.anyListOf(DatanodeDetails.class),
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
index 18c4a64..81f8c10 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestContainerPlacementFactory.java
@@ -20,6 +20,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -99,7 +100,7 @@ public class TestContainerPlacementFactory {
     when(nodeManager.getNodeStat(datanodes.get(4)))
         .thenReturn(new SCMNodeMetric(storageCapacity, 70L, 30L));
 
-    ContainerPlacementPolicy policy = ContainerPlacementPolicyFactory
+    PlacementPolicy policy = ContainerPlacementPolicyFactory
         .getPolicy(conf, nodeManager, cluster, true,
             SCMContainerPlacementMetrics.create());
 
@@ -117,7 +118,7 @@ public class TestContainerPlacementFactory {
 
   @Test
   public void testDefaultPolicy() throws IOException {
-    ContainerPlacementPolicy policy = ContainerPlacementPolicyFactory
+    PlacementPolicy policy = ContainerPlacementPolicyFactory
         .getPolicy(conf, null, null, true, null);
     Assert.assertSame(SCMContainerPlacementRandom.class, policy.getClass());
   }
@@ -125,7 +126,7 @@ public class TestContainerPlacementFactory {
   /**
    * A dummy container placement implementation for test.
    */
-  public static class DummyImpl implements ContainerPlacementPolicy {
+  public static class DummyImpl implements PlacementPolicy {
     @Override
     public List<DatanodeDetails> chooseDatanodes(
         List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 3e4508d..0687c81 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -27,8 +27,7 @@ import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -136,7 +135,7 @@ public class TestContainerPlacement {
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
         testDir.getAbsolutePath());
     conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+        SCMContainerPlacementCapacity.class, PlacementPolicy.class);
 
     SCMNodeManager nodeManager = createNodeManager(conf);
     SCMContainerManager containerManager =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java
index 5572e9a..4ad3456 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSafeModeHandler.java
@@ -25,8 +25,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager;
 import org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
@@ -70,7 +69,7 @@ public class TestSafeModeHandler {
         .thenReturn(new HashSet<>());
     replicationManager = new ReplicationManager(
         new ReplicationManagerConfiguration(),
-        containerManager, Mockito.mock(ContainerPlacementPolicy.class),
+        containerManager, Mockito.mock(PlacementPolicy.class),
         eventQueue, new LockManager(configuration));
     scmPipelineManager = Mockito.mock(SCMPipelineManager.class);
     blockManager = Mockito.mock(BlockManagerImpl.class);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
index cd975cf..6f347cf 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.client.ContainerOperationClient;
@@ -47,8 +47,7 @@ public class TestContainerOperations {
   public static void setup() throws Exception {
     ozoneConf = new OzoneConfiguration();
     ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
-    ozoneConf.setStorageSize(OZONE_SCM_CONTAINER_SIZE, 5, StorageUnit.GB);
+        SCMContainerPlacementCapacity.class, PlacementPolicy.class);
     cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(1).build();
     storageClient = new ContainerOperationClient(ozoneConf);
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
index 76eee6a..5e90e22 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
@@ -29,8 +29,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.
-    ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.
     SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.protocolPB.
@@ -59,7 +58,7 @@ public class TestContainerStateMachineIdempotency {
   public static void init() throws Exception {
     ozoneConfig = new OzoneConfiguration();
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+        SCMContainerPlacementCapacity.class, PlacementPolicy.class);
     cluster =
         MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
index 7fb9825..c88dd3a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.HddsDatanodeService;
@@ -84,7 +84,7 @@ public class TestDataScrubber {
     ozoneConfig = new OzoneConfiguration();
     ozoneConfig.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "1s");
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+        SCMContainerPlacementCapacity.class, PlacementPolicy.class);
     cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1)
         .build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
index 48ce4a6..7a564c1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.protocolPB
@@ -60,7 +60,7 @@ public class TestContainerSmallFile {
   public static void init() throws Exception {
     ozoneConfig = new OzoneConfiguration();
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+        SCMContainerPlacementCapacity.class, PlacementPolicy.class);
     cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1)
         .build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
index b19020f..bccfdba 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
@@ -31,8 +31,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.
     StorageContainerException;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.
-    ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.
     SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.protocolPB.
@@ -63,7 +62,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
   public static void init() throws Exception {
     ozoneConfig = new OzoneConfiguration();
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-        SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+        SCMContainerPlacementCapacity.class, PlacementPolicy.class);
     cluster =
         MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
     cluster.waitForClusterToBeReady();


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


[hadoop-ozone] 04/04: HDDS-1569 Support creating multiple pipelines with same datanode. Contributed by Li Cheng.

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 32af34f1946705c8260e3e07f439e03ef020733d
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Tue Oct 29 12:46:00 2019 +0800

    HDDS-1569 Support creating multiple pipelines with same datanode. Contributed by Li Cheng.
    
    This closes #28
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  10 +-
 .../common/src/main/resources/ozone-default.xml    |  15 +-
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |   4 +
 .../ContainerPlacementPolicyFactory.java           |   8 +-
 .../hdds/scm/node/states/Node2PipelineMap.java     |   2 +-
 .../scm/pipeline/BackgroundPipelineCreator.java    |   1 +
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |  89 +++++++---
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java |   5 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 136 +++++++++------
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |   4 +-
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  13 +-
 .../hdds/scm/pipeline/SCMPipelineMetrics.java      |   8 +
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |  34 ++--
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |   3 +
 .../scm/pipeline/TestPipelinePlacementPolicy.java  |  15 +-
 .../hdds/scm/pipeline/TestPipelineClose.java       |   2 +-
 .../TestRatisPipelineCreateAndDestroy.java         |  24 ++-
 .../scm/pipeline/TestRatisPipelineProvider.java    | 184 +++++++++++++++++++++
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  17 +-
 .../hadoop/hdds/scm/pipeline/TestSCMRestart.java   |   5 +-
 .../safemode/TestSCMSafeModeWithPipelineRules.java |   3 +
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |  12 ++
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |   6 +-
 .../ozone/client/rpc/Test2WayCommitInRatis.java    |   1 +
 .../ozone/client/rpc/TestBlockOutputStream.java    |   1 +
 .../rpc/TestBlockOutputStreamWithFailures.java     |   7 +-
 .../hadoop/ozone/client/rpc/TestCommitWatcher.java |   1 +
 .../rpc/TestContainerReplicationEndToEnd.java      |   5 +-
 .../client/rpc/TestContainerStateMachine.java      |   5 +-
 .../client/rpc/TestDeleteWithSlowFollower.java     |  12 +-
 .../client/rpc/TestFailureHandlingByClient.java    |   4 +-
 .../client/rpc/TestHybridPipelineOnDatanode.java   |   3 +-
 .../ozone/client/rpc/TestKeyInputStream.java       |   1 +
 .../rpc/TestMultiBlockWritesWithDnFailures.java    |   8 +-
 .../rpc/TestOzoneClientRetriesOnException.java     |   1 +
 .../client/rpc/TestOzoneRpcClientAbstract.java     |   1 +
 .../ozone/client/rpc/TestWatchForCommit.java       |   3 +
 .../TestCloseContainerByPipeline.java              |   5 +
 .../TestSCMContainerPlacementPolicyMetrics.java    |   1 +
 .../hadoop/ozone/scm/node/TestQueryNode.java       |   3 +
 .../apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java  |   3 +
 .../hadoop/ozone/freon/TestDataValidate.java       |   2 +-
 .../ozone/freon/TestFreonWithPipelineDestroy.java  |   1 +
 43 files changed, 518 insertions(+), 150 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index e6fed5b..17e09c1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -319,7 +319,15 @@ public final class ScmConfigKeys {
   // the max number of pipelines can a single datanode be engaged in.
   public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
           "ozone.scm.datanode.max.pipeline.engagement";
-  public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5;
+  // Setting to zero by default means this limit doesn't take effect.
+  public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0;
+
+  // Upper limit for how many pipelines can be created.
+  // Only for test purpose now.
+  public static final String OZONE_SCM_PIPELINE_NUMBER_LIMIT =
+      "ozone.scm.pipeline.number.limit";
+  // Setting to zero by default means this limit doesn't take effect.
+  public static final int OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT = 0;
 
   public static final String
       OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2ad9948..909c692 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -841,10 +841,19 @@
     </description>
   </property>
   <property>
-    <name>ozone.scm.datanode.max.pipeline.engagement</name>
-    <value>5</value>
+  <name>ozone.scm.datanode.max.pipeline.engagement</name>
+  <value>0</value>
+  <tag>OZONE, SCM, PIPELINE</tag>
+  <description>Max number of pipelines per datanode can be engaged in.
+  </description>
+  </property>
+  <property>
+    <name>ozone.scm.pipeline.number.limit</name>
+    <value>0</value>
     <tag>OZONE, SCM, PIPELINE</tag>
-    <description>Max number of pipelines per datanode can be engaged in.
+    <description>Upper limit for how many pipelines can be OPEN in SCM.
+      0 as default means there is no limit. Otherwise, the number is the limit
+      of max amount of pipelines which are OPEN.
     </description>
   </property>
   <property>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 845bdf1..00ad58a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -196,6 +196,10 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
           // TODO: #CLUTIL Remove creation logic when all replication types and
           // factors are handled by pipeline creator
           pipeline = pipelineManager.createPipeline(type, factor);
+        } catch (SCMException se) {
+          LOG.warn("Pipeline creation failed for type:{} factor:{}. " +
+              "Datanodes may be used up.", type, factor, se);
+          break;
         } catch (IOException e) {
           LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
                   "get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
index adaeb87..74431f9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
@@ -43,10 +43,10 @@ public final class ContainerPlacementPolicyFactory {
   }
 
 
-  public static PlacementPolicy getPolicy(Configuration conf,
-    final NodeManager nodeManager, NetworkTopology clusterMap,
-    final boolean fallback, SCMContainerPlacementMetrics metrics)
-    throws SCMException{
+  public static PlacementPolicy getPolicy(
+      Configuration conf, final NodeManager nodeManager,
+      NetworkTopology clusterMap, final boolean fallback,
+      SCMContainerPlacementMetrics metrics) throws SCMException{
     final Class<? extends PlacementPolicy> placementClass = conf
         .getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
             OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
index 714188d..18809ed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -80,7 +80,7 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
       dn2ObjectMap.computeIfPresent(dnId,
           (k, v) -> {
             v.remove(pipeline.getId());
-            return v;
+            return v.isEmpty() ? null : v;
           });
     }
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 6873566..6952f74 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -96,6 +96,7 @@ class BackgroundPipelineCreator {
           if (scheduler.isClosed()) {
             break;
           }
+
           pipelineManager.createPipeline(type, factor);
         } catch (IOException ioe) {
           break;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 1983ed6..23eb574 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
@@ -52,6 +53,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
   static final Logger LOG =
       LoggerFactory.getLogger(PipelinePlacementPolicy.class);
   private final NodeManager nodeManager;
+  private final PipelineStateManager stateManager;
   private final Configuration conf;
   private final int heavyNodeCriteria;
 
@@ -59,15 +61,17 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
    * Constructs a pipeline placement with considering network topology,
    * load balancing and rack awareness.
    *
-   * @param nodeManager Node Manager
+   * @param nodeManager NodeManager
+   * @param stateManager PipelineStateManager
    * @param conf        Configuration
    */
-  public PipelinePlacementPolicy(
-      final NodeManager nodeManager, final Configuration conf) {
+  public PipelinePlacementPolicy(final NodeManager nodeManager,
+      final PipelineStateManager stateManager, final Configuration conf) {
     super(nodeManager, conf);
     this.nodeManager = nodeManager;
     this.conf = conf;
-    heavyNodeCriteria = conf.getInt(
+    this.stateManager = stateManager;
+    this.heavyNodeCriteria = conf.getInt(
         ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
         ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
   }
@@ -76,11 +80,46 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
    * Returns true if this node meets the criteria.
    *
    * @param datanodeDetails DatanodeDetails
+   * @param nodesRequired nodes required count
    * @return true if we have enough space.
    */
   @VisibleForTesting
-  boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) {
-    return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit);
+  boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) {
+    if (heavyNodeCriteria == 0) {
+      // no limit applied.
+      return true;
+    }
+    // Datanodes from pipeline in some states can also be considered available
+    // for pipeline allocation. Thus the number of these pipeline shall be
+    // deducted from total heaviness calculation.
+    int pipelineNumDeductable = 0;
+    Set<PipelineID> pipelines = nodeManager.getPipelines(datanodeDetails);
+    for (PipelineID pid : pipelines) {
+      Pipeline pipeline;
+      try {
+        pipeline = stateManager.getPipeline(pid);
+      } catch (PipelineNotFoundException e) {
+        LOG.error("Pipeline not found in pipeline state manager during" +
+            " pipeline creation. PipelineID: " + pid +
+            " exception: " + e.getMessage());
+        continue;
+      }
+      if (pipeline != null &&
+          pipeline.getFactor().getNumber() == nodesRequired &&
+          pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+          pipeline.getPipelineState() == Pipeline.PipelineState.CLOSED) {
+        pipelineNumDeductable++;
+      }
+    }
+    boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
+        - pipelineNumDeductable) < heavyNodeCriteria;
+    if (!meet) {
+      LOG.info("Pipeline Placement: can't place more pipeline on heavy " +
+          "datanode: " + datanodeDetails.getUuid().toString() + " Heaviness: " +
+          nodeManager.getPipelinesCount(datanodeDetails) + " limit: " +
+          heavyNodeCriteria);
+    }
+    return meet;
   }
 
   /**
@@ -102,18 +141,19 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     if (excludedNodes != null) {
       healthyNodes.removeAll(excludedNodes);
     }
+    int initialHealthyNodesCount = healthyNodes.size();
     String msg;
-    if (healthyNodes.size() == 0) {
+    if (initialHealthyNodesCount == 0) {
       msg = "No healthy node found to allocate pipeline.";
       LOG.error(msg);
       throw new SCMException(msg, SCMException.ResultCodes
           .FAILED_TO_FIND_HEALTHY_NODES);
     }
 
-    if (healthyNodes.size() < nodesRequired) {
+    if (initialHealthyNodesCount < nodesRequired) {
       msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
               + " datanodes required. Found %d",
-          nodesRequired, healthyNodes.size());
+          nodesRequired, initialHealthyNodesCount);
       LOG.error(msg);
       throw new SCMException(msg,
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -121,14 +161,17 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
 
     // filter nodes that meet the size and pipeline engagement criteria.
     // Pipeline placement doesn't take node space left into account.
-    List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
-        meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList());
+    List<DatanodeDetails> healthyList = healthyNodes.stream()
+        .filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired)
+        .collect(Collectors.toList());
 
     if (healthyList.size() < nodesRequired) {
       msg = String.format("Unable to find enough nodes that meet " +
               "the criteria that cannot engage in more than %d pipelines." +
-              " Nodes required: %d Found: %d",
-          heavyNodeCriteria, nodesRequired, healthyList.size());
+              " Nodes required: %d Found: %d, healthy nodes count in " +
+              "NodeManager: %d.",
+          heavyNodeCriteria, nodesRequired, healthyList.size(),
+          initialHealthyNodesCount);
       LOG.error(msg);
       throw new SCMException(msg,
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -154,13 +197,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     // and make sure excludedNodes are excluded from list.
     List<DatanodeDetails> healthyNodes =
         filterViableNodes(excludedNodes, nodesRequired);
-    
-    // Randomly picks nodes when all nodes are equal.
+
+    // Randomly picks nodes when all nodes are equal or factor is ONE.
     // This happens when network topology is absent or
     // all nodes are on the same rack.
     if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
-      LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
-          "Required nodes: {}", nodesRequired);
       return super.getResultSet(nodesRequired, healthyNodes);
     } else {
       // Since topology and rack awareness are available, picks nodes
@@ -188,8 +229,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     // First choose an anchor nodes randomly
     DatanodeDetails anchor = chooseNode(healthyNodes);
     if (anchor == null) {
-      LOG.error("Unable to find the first healthy nodes that " +
-              "meet the criteria. Required nodes: {}, Found nodes: {}",
+      LOG.error("Pipeline Placement: Unable to find the first healthy nodes " +
+              "that meet the criteria. Required nodes: {}, Found nodes: {}",
           nodesRequired, results.size());
       throw new SCMException("Unable to find required number of nodes.",
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -204,8 +245,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
         healthyNodes, exclude,
         nodeManager.getClusterNetworkTopologyMap(), anchor);
     if (nodeOnDifferentRack == null) {
-      LOG.error("Unable to find nodes on different racks that " +
-              "meet the criteria. Required nodes: {}, Found nodes: {}",
+      LOG.error("Pipeline Placement: Unable to find nodes on different racks " +
+              " that meet the criteria. Required nodes: {}, Found nodes: {}",
           nodesRequired, results.size());
       throw new SCMException("Unable to find required number of nodes.",
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -228,9 +269,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     }
 
     if (results.size() < nodesRequired) {
-      LOG.error("Unable to find the required number of healthy nodes that " +
-              "meet the criteria. Required nodes: {}, Found nodes: {}",
-          nodesRequired, results.size());
+      LOG.error("Pipeline Placement: Unable to find the required number of " +
+              "healthy nodes that  meet the criteria. Required nodes: {}, " +
+              "Found nodes: {}", nodesRequired, results.size());
       throw new SCMException("Unable to find required number of nodes.",
           SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
     }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 443378c..8e0f32d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -52,8 +53,8 @@ class PipelineStateMap {
   PipelineStateMap() {
 
     // TODO: Use TreeMap for range operations?
-    pipelineMap = new HashMap<>();
-    pipeline2container = new HashMap<>();
+    pipelineMap = new ConcurrentHashMap<>();
+    pipeline2container = new ConcurrentHashMap<>();
     query2OpenPipelines = new HashMap<>();
     initializeQueryMap();
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 913a435..216cb68 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -20,13 +20,12 @@ package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.PlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
 import org.apache.hadoop.io.MultipleIOException;
@@ -44,8 +43,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
@@ -69,6 +66,9 @@ public class RatisPipelineProvider implements PipelineProvider {
   private final NodeManager nodeManager;
   private final PipelineStateManager stateManager;
   private final Configuration conf;
+  private final PipelinePlacementPolicy placementPolicy;
+  private int pipelineNumberLimit;
+  private int maxPipelinePerDatanode;
 
   // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
   private final int parallelismForPool = 3;
@@ -92,65 +92,93 @@ public class RatisPipelineProvider implements PipelineProvider {
     this.stateManager = stateManager;
     this.conf = conf;
     this.tlsConfig = tlsConfig;
+    this.placementPolicy =
+        new PipelinePlacementPolicy(nodeManager, stateManager, conf);
+    this.pipelineNumberLimit = conf.getInt(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT);
+    this.maxPipelinePerDatanode = conf.getInt(
+        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
   }
 
+  private List<DatanodeDetails> pickNodesNeverUsed(ReplicationFactor factor)
+      throws SCMException {
+    Set<DatanodeDetails> dnsUsed = new HashSet<>();
+    stateManager.getPipelines(ReplicationType.RATIS, factor)
+        .stream().filter(
+          p -> p.getPipelineState().equals(PipelineState.OPEN) ||
+              p.getPipelineState().equals(PipelineState.DORMANT) ||
+              p.getPipelineState().equals(PipelineState.ALLOCATED))
+        .forEach(p -> dnsUsed.addAll(p.getNodes()));
 
-  /**
-   * Create pluggable container placement policy implementation instance.
-   *
-   * @param nodeManager - SCM node manager.
-   * @param conf - configuration.
-   * @return SCM container placement policy implementation instance.
-   */
-  @SuppressWarnings("unchecked")
-  // TODO: should we rename PlacementPolicy to PipelinePlacementPolicy?
-  private static PlacementPolicy createContainerPlacementPolicy(
-      final NodeManager nodeManager, final Configuration conf) {
-    Class<? extends PlacementPolicy> implClass =
-        (Class<? extends PlacementPolicy>) conf.getClass(
-            ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
-            SCMContainerPlacementRandom.class);
+    // Get list of healthy nodes
+    List<DatanodeDetails> dns = nodeManager
+        .getNodes(HddsProtos.NodeState.HEALTHY)
+        .parallelStream()
+        .filter(dn -> !dnsUsed.contains(dn))
+        .limit(factor.getNumber())
+        .collect(Collectors.toList());
+    if (dns.size() < factor.getNumber()) {
+      String e = String
+          .format("Cannot create pipeline of factor %d using %d nodes." +
+                  " Used %d nodes. Healthy nodes %d", factor.getNumber(),
+              dns.size(), dnsUsed.size(),
+              nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size());
+      throw new SCMException(e,
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+    return dns;
+  }
 
-    try {
-      Constructor<? extends PlacementPolicy> ctor =
-          implClass.getDeclaredConstructor(NodeManager.class,
-              Configuration.class);
-      return ctor.newInstance(nodeManager, conf);
-    } catch (RuntimeException e) {
-      throw e;
-    } catch (InvocationTargetException e) {
-      throw new RuntimeException(implClass.getName()
-          + " could not be constructed.", e.getCause());
-    } catch (Exception e) {
-//      LOG.error("Unhandled exception occurred, Placement policy will not " +
-//          "be functional.");
-      throw new IllegalArgumentException("Unable to load " +
-          "PlacementPolicy", e);
+  private boolean exceedPipelineNumberLimit(ReplicationFactor factor) {
+    if (factor != ReplicationFactor.THREE) {
+      // Only put limits for Factor THREE pipelines.
+      return false;
+    }
+    // Per datanode limit
+    if (maxPipelinePerDatanode > 0) {
+      return (stateManager.getPipelines(ReplicationType.RATIS, factor).size() -
+          stateManager.getPipelines(ReplicationType.RATIS, factor,
+              Pipeline.PipelineState.CLOSED).size()) > maxPipelinePerDatanode *
+          nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) /
+          factor.getNumber();
     }
+
+    // Global limit
+    if (pipelineNumberLimit > 0) {
+      return (stateManager.getPipelines(ReplicationType.RATIS,
+          ReplicationFactor.THREE).size() - stateManager.getPipelines(
+          ReplicationType.RATIS, ReplicationFactor.THREE,
+          Pipeline.PipelineState.CLOSED).size()) >
+          (pipelineNumberLimit - stateManager.getPipelines(
+              ReplicationType.RATIS, ReplicationFactor.ONE).size());
+    }
+
+    return false;
   }
 
   @Override
   public Pipeline create(ReplicationFactor factor) throws IOException {
-    // Get set of datanodes already used for ratis pipeline
-    Set<DatanodeDetails> dnsUsed = new HashSet<>();
-    stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
-        p -> p.getPipelineState().equals(PipelineState.OPEN) ||
-            p.getPipelineState().equals(PipelineState.DORMANT) ||
-            p.getPipelineState().equals(PipelineState.ALLOCATED))
-        .forEach(p -> dnsUsed.addAll(p.getNodes()));
+    if (exceedPipelineNumberLimit(factor)) {
+      throw new SCMException("Ratis pipeline number meets the limit: " +
+          pipelineNumberLimit + " factor : " +
+          factor.getNumber(),
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
 
-    // Get list of healthy nodes
-    List<DatanodeDetails> dns =
-        nodeManager.getNodes(NodeState.HEALTHY)
-            .parallelStream()
-            .filter(dn -> !dnsUsed.contains(dn))
-            .limit(factor.getNumber())
-            .collect(Collectors.toList());
-    if (dns.size() < factor.getNumber()) {
-      String e = String
-          .format("Cannot create pipeline of factor %d using %d nodes.",
-              factor.getNumber(), dns.size());
-      throw new InsufficientDatanodesException(e);
+    List<DatanodeDetails> dns;
+
+    switch(factor) {
+    case ONE:
+      dns = pickNodesNeverUsed(ReplicationFactor.ONE);
+      break;
+    case THREE:
+      dns = placementPolicy.chooseDatanodes(null,
+          null, factor.getNumber(), 0);
+      break;
+    default:
+      throw new IllegalStateException("Unknown factor: " + factor.name());
     }
 
     Pipeline pipeline = create(factor, dns);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index 497e717..04393a1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -66,8 +66,8 @@ public final class RatisPipelineUtils {
       try {
         destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
       } catch (IOException e) {
-        LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
-            pipeline.getId(), dn);
+        LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}",
+            pipeline.getId(), dn, e.getMessage());
       }
     }
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d..b41c595 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -54,10 +54,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import static org.apache.hadoop.hdds.scm
-    .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm
-    .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
 
 /**
@@ -97,8 +93,8 @@ public class SCMPipelineManager implements PipelineManager {
     scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
     this.backgroundPipelineCreator =
         new BackgroundPipelineCreator(this, scheduler, conf);
-    int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
-        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+    int cacheSize = conf.getInt(ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB,
+        ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
     final File metaDir = ServerUtils.getScmDbDir(conf);
     final File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB);
     this.pipelineStore =
@@ -160,10 +156,9 @@ public class SCMPipelineManager implements PipelineManager {
       metrics.incNumPipelineCreated();
       metrics.createPerPipelineMetrics(pipeline);
       return pipeline;
-    } catch (InsufficientDatanodesException idEx) {
-      throw idEx;
     } catch (IOException ex) {
       metrics.incNumPipelineCreationFailed();
+      LOG.error("Pipeline creation failed.", ex);
       throw ex;
     } finally {
       lock.writeLock().unlock();
@@ -172,7 +167,7 @@ public class SCMPipelineManager implements PipelineManager {
 
   @Override
   public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
-                                 List<DatanodeDetails> nodes) {
+      List<DatanodeDetails> nodes) {
     // This will mostly be used to create dummy pipeline for SimplePipelines.
     // We don't update the metrics for SimplePipelines.
     lock.writeLock().lock();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index b6a1445..9427391 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -125,6 +125,14 @@ public final class SCMPipelineMetrics implements MetricsSource {
   }
 
   /**
+   * Get the number of pipeline created.
+   * @return number of pipeline
+   */
+  long getNumPipelineCreated() {
+    return numPipelineCreated.value();
+  }
+
+  /**
    * Increments number of failed pipeline creation count.
    */
   void incNumPipelineCreationFailed() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 2f9a66f..3b31454 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -17,8 +17,8 @@
  */
 package org.apache.hadoop.hdds.scm.safemode;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -54,8 +54,9 @@ public class HealthyPipelineSafeModeRule
   private final PipelineManager pipelineManager;
   private int healthyPipelineThresholdCount;
   private int currentHealthyPipelineCount = 0;
-  private final Map<PipelineID, Boolean> processedPipelines = new HashMap<>();
   private final double healthyPipelinesPercent;
+  private final Set<PipelineID> processedPipelineIDs =
+      new HashSet<>();
 
   HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
       PipelineManager pipelineManager,
@@ -121,29 +122,34 @@ public class HealthyPipelineSafeModeRule
     // from datanode again during threshold calculation.
     Preconditions.checkNotNull(pipelineReportFromDatanode);
 
+    Pipeline pipeline;
     PipelineReportsProto pipelineReport =
         pipelineReportFromDatanode.getReport();
 
     for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-      PipelineID pipelineID = PipelineID.getFromProtobuf(
-          report.getPipelineID());
-      Pipeline pipeline;
+      PipelineID pipelineID = PipelineID
+          .getFromProtobuf(report.getPipelineID());
+      if (processedPipelineIDs.contains(pipelineID)) {
+        continue;
+      }
+
       try {
         pipeline = pipelineManager.getPipeline(pipelineID);
       } catch (PipelineNotFoundException e) {
         continue;
       }
 
-      if (!processedPipelines.containsKey(pipelineID)) {
-        if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+      if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
             report.getIsLeader()) {
-          // If the pipeline gets reported with a leader we mark it as healthy
-          currentHealthyPipelineCount++;
-          getSafeModeMetrics().incCurrentHealthyPipelinesCount();
-          processedPipelines.put(pipelineID, Boolean.TRUE);
-        }
+        // If the pipeline gets reported with a leader we mark it as healthy
+        // for this pipeline.
+        currentHealthyPipelineCount++;
+        getSafeModeMetrics().incCurrentHealthyPipelinesCount();
+        processedPipelineIDs.add(pipelineID);
       }
+
     }
+
     if (scmInSafeMode()) {
       SCMSafeModeManager.getLogger().info(
           "SCM in safe mode. Healthy pipelines reported count is {}, " +
@@ -154,7 +160,7 @@ public class HealthyPipelineSafeModeRule
 
   @Override
   protected void cleanup() {
-    processedPipelines.clear();
+    processedPipelineIDs.clear();
   }
 
   @VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index c140119..7e88043 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -65,6 +65,8 @@ import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
 /**
  * Test DeadNodeHandler.
  */
@@ -85,6 +87,7 @@ public class TestDeadNodeHandler {
     storageDir = GenericTestUtils.getTempPath(
         TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
     eventQueue = new EventQueue();
     scm = HddsTestUtils.getScm(conf);
     nodeManager = (SCMNodeManager) scm.getScmNodeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 2e0d0b1..1e34039 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -34,11 +34,14 @@ import org.junit.Test;
 import java.util.*;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
 /**
  * Test for PipelinePlacementPolicy.
  */
 public class TestPipelinePlacementPolicy {
   private MockNodeManager nodeManager;
+  private OzoneConfiguration conf;
   private PipelinePlacementPolicy placementPolicy;
   private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
 
@@ -46,8 +49,10 @@ public class TestPipelinePlacementPolicy {
   public void init() throws Exception {
     nodeManager = new MockNodeManager(true,
         PIPELINE_PLACEMENT_MAX_NODES_COUNT);
-    placementPolicy =
-        new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration());
+    conf = new OzoneConfiguration();
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
+    placementPolicy = new PipelinePlacementPolicy(
+        nodeManager, new PipelineStateManager(conf), conf);
   }
 
   @Test
@@ -123,7 +128,7 @@ public class TestPipelinePlacementPolicy {
   public void testHeavyNodeShouldBeExcluded() throws SCMException{
     List<DatanodeDetails> healthyNodes =
         nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
-    int nodesRequired = healthyNodes.size()/2;
+    int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
     // only minority of healthy NODES are heavily engaged in pipelines.
     int minorityHeavy = healthyNodes.size()/2 - 1;
     List<DatanodeDetails> pickedNodes1 = placementPolicy.chooseDatanodes(
@@ -179,7 +184,9 @@ public class TestPipelinePlacementPolicy {
     }
 
     int considerHeavyCount =
-        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1;
+        conf.getInt(
+            ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+            ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT) + 1;
 
     Node2PipelineMap mockMap = new Node2PipelineMap();
     for (DatanodeDetails node : nodes) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559..9bccb1a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -169,7 +169,7 @@ public class TestPipelineClose {
         new PipelineActionHandler(pipelineManager, conf);
     pipelineActionHandler
         .onMessage(pipelineActionsFromDatanode, new EventQueue());
-    Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2));
+    Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 10));
     OzoneContainer ozoneContainer =
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
             .getContainer();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index 6ace90c..cbe450e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.HddsDatanodeService;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -34,6 +35,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
 /**
@@ -48,9 +50,12 @@ public class TestRatisPipelineCreateAndDestroy {
   public void init(int numDatanodes) throws Exception {
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
         GenericTestUtils.getRandomizedTempPath());
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
     cluster = MiniOzoneCluster.newBuilder(conf)
             .setNumDatanodes(numDatanodes)
-            .setHbInterval(1000)
+            .setTotalPipelineNumLimit(numDatanodes + numDatanodes/3)
+            .setHbInterval(2000)
             .setHbProcessorInterval(1000)
             .build();
     cluster.waitForClusterToBeReady();
@@ -103,7 +108,9 @@ public class TestRatisPipelineCreateAndDestroy {
     } catch (IOException ioe) {
       // As now all datanodes are shutdown, they move to stale state, there
       // will be no sufficient datanodes to create the pipeline.
-      Assert.assertTrue(ioe instanceof InsufficientDatanodesException);
+      Assert.assertTrue(ioe instanceof SCMException);
+      Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+          ((SCMException) ioe).getResult());
     }
 
     // make sure pipelines is destroyed
@@ -116,9 +123,14 @@ public class TestRatisPipelineCreateAndDestroy {
     for (Pipeline pipeline : pipelines) {
       pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
     }
-    // make sure pipelines is created after node start
-    pipelineManager.triggerPipelineCreation();
-    waitForPipelines(1);
+
+    if (cluster.getStorageContainerManager()
+        .getScmNodeManager().getNodeCount(HddsProtos.NodeState.HEALTHY) >=
+        HddsProtos.ReplicationFactor.THREE.getNumber()) {
+      // make sure pipelines is created after node start
+      pipelineManager.triggerPipelineCreation();
+      waitForPipelines(1);
+    }
   }
 
   private void waitForPipelines(int numPipelines)
@@ -126,6 +138,6 @@ public class TestRatisPipelineCreateAndDestroy {
     GenericTestUtils.waitFor(() -> pipelineManager
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
-        .size() == numPipelines, 100, 40000);
+        .size() >= numPipelines, 100, 40000);
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
new file mode 100644
index 0000000..7862605
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
+/**
+ * Test for RatisPipelineProvider.
+ */
+public class TestRatisPipelineProvider {
+
+  private NodeManager nodeManager;
+  private PipelineProvider provider;
+  private PipelineStateManager stateManager;
+
+  @Before
+  public void init() throws Exception {
+    nodeManager = new MockNodeManager(true, 10);
+    stateManager = new PipelineStateManager(new OzoneConfiguration());
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
+    provider = new MockRatisPipelineProvider(nodeManager,
+        stateManager, conf);
+  }
+
+  private void createPipelineAndAssertions(
+          HddsProtos.ReplicationFactor factor) throws IOException {
+    Pipeline pipeline = provider.create(factor);
+    stateManager.addPipeline(pipeline);
+    nodeManager.addPipeline(pipeline);
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(pipeline.getPipelineState(),
+            Pipeline.PipelineState.OPEN);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+    Pipeline pipeline1 = provider.create(factor);
+    stateManager.addPipeline(pipeline1);
+    nodeManager.addPipeline(pipeline1);
+    Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline1.getFactor(), factor);
+    Assert.assertEquals(pipeline1.getPipelineState(),
+            Pipeline.PipelineState.OPEN);
+    Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
+  }
+
+  @Test
+  public void testCreatePipelineWithFactor() throws IOException {
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+    Pipeline pipeline = provider.create(factor);
+    stateManager.addPipeline(pipeline);
+    nodeManager.addPipeline(pipeline);
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(pipeline.getPipelineState(),
+        Pipeline.PipelineState.OPEN);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+    factor = HddsProtos.ReplicationFactor.ONE;
+    Pipeline pipeline1 = provider.create(factor);
+    stateManager.addPipeline(pipeline1);
+    nodeManager.addPipeline(pipeline1);
+    Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline1.getFactor(), factor);
+    Assert.assertEquals(pipeline1.getPipelineState(),
+        Pipeline.PipelineState.OPEN);
+    Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
+  }
+
+  @Test
+  public void testCreatePipelineWithFactorThree() throws IOException {
+    createPipelineAndAssertions(HddsProtos.ReplicationFactor.THREE);
+  }
+
+  @Test
+  public void testCreatePipelineWithFactorOne() throws IOException {
+    createPipelineAndAssertions(HddsProtos.ReplicationFactor.ONE);
+  }
+
+  private List<DatanodeDetails> createListOfNodes(int nodeCount) {
+    List<DatanodeDetails> nodes = new ArrayList<>();
+    for (int i = 0; i < nodeCount; i++) {
+      nodes.add(TestUtils.randomDatanodeDetails());
+    }
+    return nodes;
+  }
+
+  @Test
+  public void testCreatePipelineWithNodes() {
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+    Pipeline pipeline =
+        provider.create(factor, createListOfNodes(factor.getNumber()));
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(
+        pipeline.getPipelineState(), Pipeline.PipelineState.OPEN);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+    factor = HddsProtos.ReplicationFactor.ONE;
+    pipeline = provider.create(factor, createListOfNodes(factor.getNumber()));
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(pipeline.getPipelineState(),
+        Pipeline.PipelineState.OPEN);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+  }
+
+  @Test
+  public void testCreatePipelinesDnExclude() throws IOException {
+    List<DatanodeDetails> allHealthyNodes =
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    int totalHealthyNodesCount = allHealthyNodes.size();
+
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+
+    List<DatanodeDetails> closePipelineDns = new ArrayList<>();
+    for (int i = 0; i < totalHealthyNodesCount/3; i++) {
+      List<DatanodeDetails> pipelineDns = allHealthyNodes
+          .subList(3 * i, 3 * (i + 1));
+
+      Pipeline.PipelineState state;
+      if (i % 2 == 0) {
+        state = Pipeline.PipelineState.OPEN;
+      } else {
+        state = Pipeline.PipelineState.CLOSED;
+        closePipelineDns.addAll(pipelineDns);
+      }
+
+      Pipeline openPipeline = Pipeline.newBuilder()
+          .setType(HddsProtos.ReplicationType.RATIS)
+          .setFactor(factor)
+          .setNodes(pipelineDns)
+          .setState(state)
+          .setId(PipelineID.randomId())
+          .build();
+
+
+      stateManager.addPipeline(openPipeline);
+      nodeManager.addPipeline(openPipeline);
+    }
+
+    Pipeline pipeline = provider.create(factor);
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(pipeline.getPipelineState(),
+        Pipeline.PipelineState.OPEN);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+    List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
+
+    // Since we have only 10 DNs, at least 1 pipeline node should have been
+    // from the closed pipeline DN list.
+    Assert.assertTrue(pipelineNodes.parallelStream().filter(
+        closePipelineDns::contains).count() > 0);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 105d2e2..7aba39a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 
@@ -34,12 +35,13 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .PipelineReportFromDatanode;
-import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -59,6 +61,7 @@ public class TestSCMPipelineManager {
   @Before
   public void setUp() throws Exception {
     conf = new OzoneConfiguration();
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
     testDir = GenericTestUtils
         .getTestDir(TestSCMPipelineManager.class.getSimpleName());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
@@ -188,10 +191,10 @@ public class TestSCMPipelineManager {
       pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
           HddsProtos.ReplicationFactor.THREE);
       Assert.fail();
-    } catch (InsufficientDatanodesException idEx) {
-      Assert.assertEquals(
-          "Cannot create pipeline of factor 3 using 1 nodes.",
-          idEx.getMessage());
+    } catch (SCMException ioe) {
+      // pipeline creation failed this time.
+      Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+          ioe.getResult());
     }
 
     metrics = getMetrics(
@@ -201,7 +204,7 @@ public class TestSCMPipelineManager {
 
     numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
-    Assert.assertTrue(numPipelineCreateFailed == 0);
+    Assert.assertTrue(numPipelineCreateFailed == 1);
     
     // clean up
     pipelineManager.close();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
index 459a67a..baeee6a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@ -57,8 +57,11 @@ public class TestSCMRestart {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
+    int numOfNodes = 4;
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(4)
+        .setNumDatanodes(numOfNodes)
+        // allow only one FACTOR THREE pipeline.
+        .setTotalPipelineNumLimit(numOfNodes + 1)
         .setHbInterval(1000)
         .setHbProcessorInterval(1000)
         .build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
index 7cfd555..09c633d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
@@ -38,6 +38,7 @@ import org.junit.rules.TemporaryFolder;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 import static org.junit.Assert.fail;
 
 /**
@@ -62,6 +63,8 @@ public class TestSCMSafeModeWithPipelineRules {
         true);
     conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "10s");
     conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 50);
+
     clusterBuilder = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(numDatanodes)
         .setHbInterval(1000)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 59cef37..5784196 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -238,6 +238,7 @@ public interface MiniOzoneCluster {
     protected static final int DEFAULT_HB_INTERVAL_MS = 1000;
     protected static final int DEFAULT_HB_PROCESSOR_INTERVAL_MS = 100;
     protected static final int ACTIVE_OMS_NOT_SET = -1;
+    protected static final int DEFAULT_PIPELIME_LIMIT = 3;
 
     protected final OzoneConfiguration conf;
     protected String path;
@@ -265,6 +266,7 @@ public interface MiniOzoneCluster {
     protected int numOfDatanodes = 1;
     protected boolean  startDataNodes = true;
     protected CertificateClient certClient;
+    protected int pipelineNumLimit = DEFAULT_PIPELIME_LIMIT;
 
     protected Builder(OzoneConfiguration conf) {
       this.conf = conf;
@@ -352,6 +354,16 @@ public interface MiniOzoneCluster {
     }
 
     /**
+     * Sets the total number of pipelines to create.
+     * @param val number of pipelines
+     * @return MiniOzoneCluster.Builder
+     */
+    public Builder setTotalPipelineNumLimit(int val) {
+      pipelineNumLimit = val;
+      return this;
+    }
+
+    /**
      * Sets the number of HeartBeat Interval of Datanodes, the value should be
      * in MilliSeconds.
      *
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 2813711..f9938c5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -531,6 +531,10 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
           streamBufferMaxSize.get(), streamBufferSizeUnit.get());
       conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(),
           streamBufferSizeUnit.get());
+      // MiniOzoneCluster should have global pipeline upper limit.
+      conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+          pipelineNumLimit == DEFAULT_PIPELIME_LIMIT ?
+              2 * numOfDatanodes : pipelineNumLimit);
       configureTrace();
     }
 
@@ -542,7 +546,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
      * Creates a new StorageContainerManager instance.
      *
      * @return {@link StorageContainerManager}
-     *
+     *Wa
      * @throws IOException
      */
     StorageContainerManager createSCM()
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
index fd2cea3..1eef382 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
@@ -81,6 +81,7 @@ public class Test2WayCommitInRatis {
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 399b977..444f362 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -84,6 +84,7 @@ public class TestBlockOutputStream {
         StorageUnit.MB);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 8649837..3bcd81f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
 import org.apache.hadoop.hdds.scm.XceiverClientRatis;
@@ -89,9 +90,11 @@ public class TestBlockOutputStreamWithFailures {
     conf.setQuietMode(false);
     conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
         StorageUnit.MB);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
+
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
-        .setBlockSize(blockSize).setChunkSize(chunkSize)
-        .setStreamBufferFlushSize(flushSize)
+        .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
+        .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
         .setStreamBufferMaxSize(maxFlushSize)
         .setStreamBufferSizeUnit(StorageUnit.BYTES).build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index 344c51e..d76155b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -96,6 +96,7 @@ public class TestCommitWatcher {
         StorageUnit.MB);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
index 0886d26..a8f61f7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
@@ -56,6 +56,7 @@ import java.util.function.Predicate;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 
 /**
  * Tests delete key operation with a slow follower in the datanode
@@ -99,10 +100,12 @@ public class TestContainerReplicationEndToEnd {
         1000, TimeUnit.SECONDS);
     conf.setLong("hdds.scm.replication.thread.interval",
         containerReportInterval);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
 
     conf.setQuietMode(false);
     cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).setHbInterval(200)
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4)
+            .setTotalPipelineNumLimit(6).setHbInterval(200)
             .build();
     cluster.waitForClusterToBeReady();
     cluster.getStorageContainerManager().getReplicationManager().start();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 82c4910..3b806dd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -52,8 +52,7 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
 
 /**
  * Tests the containerStateMachine failure handling.
@@ -82,7 +81,7 @@ public class TestContainerStateMachine {
     baseDir.mkdirs();
 
     conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
-  //  conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+    //  conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
     conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
index 30c2624..644469e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
@@ -80,6 +80,7 @@ public class TestDeleteWithSlowFollower {
   private static String bucketName;
   private static String path;
   private static XceiverClientManager xceiverClientManager;
+  private static final int FACTOR_THREE_PIPELINE_COUNT = 1;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -111,10 +112,13 @@ public class TestDeleteWithSlowFollower {
         1000, TimeUnit.SECONDS);
     conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
         1, TimeUnit.SECONDS);
-
     conf.setQuietMode(false);
-    cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(100)
+    int numOfDatanodes = 3;
+    cluster = MiniOzoneCluster.newBuilder(conf)
+            .setNumDatanodes(numOfDatanodes)
+            .setTotalPipelineNumLimit(
+                numOfDatanodes + FACTOR_THREE_PIPELINE_COUNT)
+            .setHbInterval(100)
             .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
@@ -176,7 +180,7 @@ public class TestDeleteWithSlowFollower {
         cluster.getStorageContainerManager().getPipelineManager()
             .getPipelines(HddsProtos.ReplicationType.RATIS,
                 HddsProtos.ReplicationFactor.THREE);
-    Assert.assertTrue(pipelineList.size() == 1);
+    Assert.assertTrue(pipelineList.size() >= FACTOR_THREE_PIPELINE_COUNT);
     Pipeline pipeline = pipelineList.get(0);
     for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
       if (ContainerTestHelper.isRatisFollower(dn, pipeline)) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index edb796b..7b6d555 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -97,6 +98,7 @@ public class TestFailureHandlingByClient {
         1, TimeUnit.SECONDS);
     conf.setBoolean(
         OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
 
     conf.setQuietMode(false);
     conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
@@ -105,7 +107,7 @@ public class TestFailureHandlingByClient {
         Collections.singleton(HddsUtils.getHostName(conf))).get(0),
         "/rack1");
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(10).build();
+        .setNumDatanodes(10).setTotalPipelineNumLimit(15).build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
index 47a716e..75af061 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
@@ -67,7 +67,8 @@ public class TestHybridPipelineOnDatanode {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3)
+        .setTotalPipelineNumLimit(5).build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index 6e7e328..9e19f1c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -82,6 +82,7 @@ public class TestKeyInputStream {
         StorageUnit.MB);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(3)
+        .setTotalPipelineNumLimit(5)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
index 9666247..64047ba 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
@@ -47,8 +47,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
 
 /**
  * Tests MultiBlock Writes with Dn failures by Ozone Client.
@@ -87,10 +86,13 @@ public class TestMultiBlockWritesWithDnFailures {
     conf.setTimeDuration(
         OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
         1, TimeUnit.SECONDS);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
 
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(datanodes).build();
+        .setNumDatanodes(datanodes)
+        .setTotalPipelineNumLimit(0)
+        .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
     client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 5f6d494..0bc94d0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -91,6 +91,7 @@ public class TestOzoneClientRetriesOnException {
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 2163773..fd2800f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -165,6 +165,7 @@ public abstract class TestOzoneRpcClientAbstract {
   static void startCluster(OzoneConfiguration conf) throws Exception {
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(3)
+        .setTotalPipelineNumLimit(10)
         .setScmId(scmId)
         .build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index a5d601e..9b7ed70 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -56,6 +56,7 @@ import java.util.concurrent.TimeoutException;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 
 /**
  * This class verifies the watchForCommit Handling by xceiverClient.
@@ -92,10 +93,12 @@ public class TestWatchForCommit {
     conf.setTimeDuration(
         OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
         1, TimeUnit.SECONDS);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
 
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
         .setBlockSize(blockSize)
         .setChunkSize(chunkSize)
         .setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index b676e1c..3cafb7d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -52,6 +52,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
 /**
  * Test container closing.
  */
@@ -73,8 +75,11 @@ public class TestCloseContainerByPipeline {
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
     conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, "1");
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(10)
+        .setTotalPipelineNumLimit(15)
         .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
index 536d807..e677544 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
@@ -82,6 +82,7 @@ public class TestSCMContainerPlacementPolicyMetrics {
         "/rack1");
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(4)
+        .setTotalPipelineNumLimit(10)
         .build();
     cluster.waitForClusterToBeReady();
     metrics = getMetrics(SCMContainerPlacementMetrics.class.getSimpleName());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
index ecc2b3e..a882dcd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.scm.node;
 
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -76,9 +77,11 @@ public class TestQueryNode {
     conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
     conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
 
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(numOfDatanodes)
+        .setTotalPipelineNumLimit(numOfDatanodes + numOfDatanodes/2)
         .build();
     cluster.waitForClusterToBeReady();
     scmClient = new ContainerOperationClient(conf);
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
index fcabc67..b3bbe3b 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
@@ -97,6 +97,7 @@ public class TestOzoneFsHAURLs {
     conf.setTimeDuration(
         OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
         LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
 
     OMStorage omStore = new OMStorage(conf);
     omStore.setClusterId(clusterId);
@@ -106,6 +107,8 @@ public class TestOzoneFsHAURLs {
 
     // Start the cluster
     cluster = MiniOzoneCluster.newHABuilder(conf)
+        .setNumDatanodes(7)
+        .setTotalPipelineNumLimit(10)
         .setClusterId(clusterId)
         .setScmId(scmId)
         .setOMServiceId(omServiceId)
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
index fdcb822..3e1c826 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
@@ -42,7 +42,7 @@ public abstract class TestDataValidate {
   static void startCluster(OzoneConfiguration conf) throws Exception {
     conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(5).build();
+        .setNumDatanodes(5).setTotalPipelineNumLimit(8).build();
     cluster.waitForClusterToBeReady();
   }
 
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
index 13ecab6..bd30d4e 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
@@ -53,6 +53,7 @@ public class TestFreonWithPipelineDestroy {
       .setHbProcessorInterval(1000)
       .setHbInterval(1000)
       .setNumDatanodes(3)
+      .setTotalPipelineNumLimit(8)
       .build();
     cluster.waitForClusterToBeReady();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


[hadoop-ozone] 01/04: HDDS-1577. Add default pipeline placement policy implementation. (#1366)

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 748cb018a0b20b0e0993565ce15c8ad908d6d1e2
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Thu Sep 5 11:51:40 2019 +0800

    HDDS-1577. Add default pipeline placement policy implementation. (#1366)
    
    
    
    (cherry picked from commit b640a5f6d53830aee4b9c2a7d17bf57c987962cd)
---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   5 +
 .../common/src/main/resources/ozone-default.xml    |   7 +
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  14 +
 .../hadoop/hdds/scm/node/NodeStateManager.java     |   9 +
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  19 ++
 .../hdds/scm/node/states/Node2ObjectsMap.java      |   4 +-
 .../hdds/scm/node/states/Node2PipelineMap.java     |  12 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 338 +++++++++++++++++++++
 .../hadoop/hdds/scm/container/MockNodeManager.java |  36 ++-
 .../scm/pipeline/TestPipelinePlacementPolicy.java  | 197 ++++++++++++
 .../testutils/ReplicationNodeManagerMock.java      |  16 +
 11 files changed, 654 insertions(+), 3 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 3c35e56..e6fed5b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -315,6 +315,11 @@ public final class ScmConfigKeys {
   public static final String OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT =
       "ozone.scm.pipeline.owner.container.count";
   public static final int OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT = 3;
+  // Pipeline placement policy:
+  // the max number of pipelines can a single datanode be engaged in.
+  public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
+          "ozone.scm.datanode.max.pipeline.engagement";
+  public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5;
 
   public static final String
       OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 8110242..94e8557 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -839,6 +839,13 @@
     </description>
   </property>
   <property>
+    <name>ozone.scm.datanode.max.pipeline.engagement</name>
+    <value>5</value>
+    <tag>OZONE, SCM, PIPELINE</tag>
+    <description>Max number of pipelines per datanode can be engaged in.
+    </description>
+  </property>
+  <property>
     <name>ozone.scm.container.size</name>
     <value>5GB</value>
     <tag>OZONE, PERFORMANCE, MANAGEMENT</tag>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index fd8bb87..37562fe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -118,6 +119,13 @@ public interface NodeManager extends StorageContainerNodeProtocol,
   Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails);
 
   /**
+   * Get the count of pipelines a datanodes is associated with.
+   * @param datanodeDetails DatanodeDetails
+   * @return The number of pipelines
+   */
+  int getPipelinesCount(DatanodeDetails datanodeDetails);
+
+  /**
    * Add pipeline information in the NodeManager.
    * @param pipeline - Pipeline to be added
    */
@@ -199,4 +207,10 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    * @return the given datanode, or empty list if none found
    */
   List<DatanodeDetails> getNodesByAddress(String address);
+
+  /**
+   * Get cluster map as in network topology for this node manager.
+   * @return cluster map
+   */
+  NetworkTopology getClusterNetworkTopologyMap();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 954cb0e..9d2a9f2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -284,6 +284,15 @@ public class NodeStateManager implements Runnable, Closeable {
   }
 
   /**
+   * Get the count of pipelines associated to single datanode.
+   * @param datanodeDetails single datanode
+   * @return number of pipelines associated with it
+   */
+  public int getPipelinesCount(DatanodeDetails datanodeDetails) {
+    return node2PipelineMap.getPipelinesCount(datanodeDetails.getUuid());
+  }
+
+  /**
    * Get information about the node.
    *
    * @param datanodeDetails DatanodeDetails
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index f077e72..66cca46 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -510,6 +510,16 @@ public class SCMNodeManager implements NodeManager {
   }
 
   /**
+   * Get the count of pipelines a datanodes is associated with.
+   * @param datanodeDetails DatanodeDetails
+   * @return The number of pipelines
+   */
+  @Override
+  public int getPipelinesCount(DatanodeDetails datanodeDetails) {
+    return nodeStateManager.getPipelinesCount(datanodeDetails);
+  }
+
+  /**
    * Add pipeline information in the NodeManager.
    *
    * @param pipeline - Pipeline to be added
@@ -643,6 +653,15 @@ public class SCMNodeManager implements NodeManager {
     return results;
   }
 
+  /**
+   * Get cluster map as in network topology for this node manager.
+   * @return cluster map
+   */
+  @Override
+  public NetworkTopology getClusterNetworkTopologyMap() {
+    return clusterMap;
+  }
+
   private String nodeResolve(String hostname) {
     List<String> hosts = new ArrayList<>(1);
     hosts.add(hostname);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
index 37525b0..57a377d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
@@ -67,6 +67,7 @@ public class Node2ObjectsMap<T> {
    * @param datanodeID   -- Datanode UUID
    * @param containerIDs - List of ContainerIDs.
    */
+  @VisibleForTesting
   public void insertNewDatanode(UUID datanodeID, Set<T> containerIDs)
       throws SCMException {
     Preconditions.checkNotNull(containerIDs);
@@ -83,7 +84,8 @@ public class Node2ObjectsMap<T> {
    *
    * @param datanodeID - Datanode ID.
    */
-  void removeDatanode(UUID datanodeID) {
+  @VisibleForTesting
+  public void removeDatanode(UUID datanodeID) {
     Preconditions.checkNotNull(datanodeID);
     dn2ObjectMap.computeIfPresent(datanodeID, (k, v) -> null);
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
index f8633f9..714188d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -42,7 +42,7 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
   }
 
   /**
-   * Returns null if there no pipelines associated with this datanode ID.
+   * Returns null if there are no pipelines associated with this datanode ID.
    *
    * @param datanode - UUID
    * @return Set of pipelines or Null.
@@ -52,6 +52,16 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
   }
 
   /**
+   * Return 0 if there are no pipelines associated with this datanode ID.
+   * @param datanode - UUID
+   * @return Number of pipelines or 0.
+   */
+  public int getPipelinesCount(UUID datanode) {
+    Set<PipelineID> pipelines = getObjects(datanode);
+    return pipelines == null ? 0 : pipelines.size();
+  }
+
+  /**
    * Adds a pipeline entry to a given dataNode in the map.
    *
    * @param pipeline Pipeline to be added
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
new file mode 100644
index 0000000..cb9954d
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMCommonPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Pipeline placement policy that choose datanodes based on load balancing
+ * and network topology to supply pipeline creation.
+ * <p>
+ * 1. get a list of healthy nodes
+ * 2. filter out nodes that are not too heavily engaged in other pipelines
+ * 3. Choose an anchor node among the viable nodes.
+ * 4. Choose other nodes around the anchor node based on network topology
+ */
+public final class PipelinePlacementPolicy extends SCMCommonPolicy {
+  @VisibleForTesting
+  static final Logger LOG =
+      LoggerFactory.getLogger(PipelinePlacementPolicy.class);
+  private final NodeManager nodeManager;
+  private final Configuration conf;
+  private final int heavyNodeCriteria;
+
+  /**
+   * Constructs a pipeline placement with considering network topology,
+   * load balancing and rack awareness.
+   *
+   * @param nodeManager Node Manager
+   * @param conf        Configuration
+   */
+  public PipelinePlacementPolicy(
+      final NodeManager nodeManager, final Configuration conf) {
+    super(nodeManager, conf);
+    this.nodeManager = nodeManager;
+    this.conf = conf;
+    heavyNodeCriteria = conf.getInt(
+        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
+  }
+
+  /**
+   * Returns true if this node meets the criteria.
+   *
+   * @param datanodeDetails DatanodeDetails
+   * @return true if we have enough space.
+   */
+  @VisibleForTesting
+  boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) {
+    return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit);
+  }
+
+  /**
+   * Filter out viable nodes based on
+   * 1. nodes that are healthy
+   * 2. nodes that are not too heavily engaged in other pipelines
+   *
+   * @param excludedNodes - excluded nodes
+   * @param nodesRequired - number of datanodes required.
+   * @return a list of viable nodes
+   * @throws SCMException when viable nodes are not enough in numbers
+   */
+  List<DatanodeDetails> filterViableNodes(
+      List<DatanodeDetails> excludedNodes, int nodesRequired)
+      throws SCMException {
+    // get nodes in HEALTHY state
+    List<DatanodeDetails> healthyNodes =
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    if (excludedNodes != null) {
+      healthyNodes.removeAll(excludedNodes);
+    }
+    String msg;
+    if (healthyNodes.size() == 0) {
+      msg = "No healthy node found to allocate pipeline.";
+      LOG.error(msg);
+      throw new SCMException(msg, SCMException.ResultCodes
+          .FAILED_TO_FIND_HEALTHY_NODES);
+    }
+
+    if (healthyNodes.size() < nodesRequired) {
+      msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
+              + " datanodes required. Found %d",
+          nodesRequired, healthyNodes.size());
+      LOG.error(msg);
+      throw new SCMException(msg,
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+
+    // filter nodes that meet the size and pipeline engagement criteria.
+    // Pipeline placement doesn't take node space left into account.
+    List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
+        meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList());
+
+    if (healthyList.size() < nodesRequired) {
+      msg = String.format("Unable to find enough nodes that meet " +
+              "the criteria that cannot engage in more than %d pipelines." +
+              " Nodes required: %d Found: %d",
+          heavyNodeCriteria, nodesRequired, healthyList.size());
+      LOG.error(msg);
+      throw new SCMException(msg,
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+    return healthyList;
+  }
+
+  /**
+   * Pipeline placement choose datanodes to join the pipeline.
+   *
+   * @param excludedNodes - excluded nodes
+   * @param favoredNodes  - list of nodes preferred.
+   * @param nodesRequired - number of datanodes required.
+   * @param sizeRequired  - size required for the container or block.
+   * @return a list of chosen datanodeDetails
+   * @throws SCMException when chosen nodes are not enough in numbers
+   */
+  @Override
+  public List<DatanodeDetails> chooseDatanodes(
+      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
+      int nodesRequired, final long sizeRequired) throws SCMException {
+    // get a list of viable nodes based on criteria
+    List<DatanodeDetails> healthyNodes =
+        filterViableNodes(excludedNodes, nodesRequired);
+
+    List<DatanodeDetails> results = new ArrayList<>();
+
+    // Randomly picks nodes when all nodes are equal.
+    // This happens when network topology is absent or
+    // all nodes are on the same rack.
+    if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
+      LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
+          "Required nodes: {}", nodesRequired);
+      results = super.getResultSet(nodesRequired, healthyNodes);
+      if (results.size() < nodesRequired) {
+        LOG.error("Unable to find the required number of healthy nodes that " +
+                "meet the criteria. Required nodes: {}, Found nodes: {}",
+            nodesRequired, results.size());
+        throw new SCMException("Unable to find required number of nodes.",
+            SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+      }
+      return results;
+    }
+
+    // Since nodes are widely distributed, the results should be selected
+    // base on distance in topology, rack awareness and load balancing.
+    List<DatanodeDetails> exclude = new ArrayList<>();
+    exclude.addAll(excludedNodes);
+    // First choose an anchor nodes randomly
+    DatanodeDetails anchor = chooseNode(healthyNodes);
+    if (anchor == null) {
+      LOG.error("Unable to find the first healthy nodes that " +
+              "meet the criteria. Required nodes: {}, Found nodes: {}",
+          nodesRequired, results.size());
+      throw new SCMException("Unable to find required number of nodes.",
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+
+    results.add(anchor);
+    exclude.add(anchor);
+    nodesRequired--;
+
+    // Choose the second node on different racks from anchor.
+    DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness(
+        healthyNodes, excludedNodes,
+        nodeManager.getClusterNetworkTopologyMap(), anchor);
+    if (nodeOnDifferentRack == null) {
+      LOG.error("Unable to find nodes on different racks that " +
+              "meet the criteria. Required nodes: {}, Found nodes: {}",
+          nodesRequired, results.size());
+      throw new SCMException("Unable to find required number of nodes.",
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+
+    results.add(nodeOnDifferentRack);
+    exclude.add(nodeOnDifferentRack);
+    nodesRequired--;
+
+    // Then choose nodes close to anchor based on network topology
+    for (int x = 0; x < nodesRequired; x++) {
+      // invoke the choose function defined in the derived classes.
+      DatanodeDetails pick = chooseNodeFromNetworkTopology(
+          nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
+      if (pick != null) {
+        results.add(pick);
+        // exclude the picked node for next time
+        exclude.add(pick);
+      }
+    }
+
+    if (results.size() < nodesRequired) {
+      LOG.error("Unable to find the required number of healthy nodes that " +
+              "meet the criteria. Required nodes: {}, Found nodes: {}",
+          nodesRequired, results.size());
+      throw new SCMException("Unable to find required number of nodes.",
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+    return results;
+  }
+
+  /**
+   * Find a node from the healthy list and return it after removing it from the
+   * list that we are operating on.
+   *
+   * @param healthyNodes - Set of healthy nodes we can choose from.
+   * @return chosen datanodDetails
+   */
+  @Override
+  public DatanodeDetails chooseNode(
+      List<DatanodeDetails> healthyNodes) {
+    int firstNodeNdx = getRand().nextInt(healthyNodes.size());
+    int secondNodeNdx = getRand().nextInt(healthyNodes.size());
+
+    DatanodeDetails datanodeDetails;
+    // There is a possibility that both numbers will be same.
+    // if that is so, we just return the node.
+    if (firstNodeNdx == secondNodeNdx) {
+      datanodeDetails = healthyNodes.get(firstNodeNdx);
+    } else {
+      DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx);
+      DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx);
+      SCMNodeMetric firstNodeMetric =
+          nodeManager.getNodeStat(firstNodeDetails);
+      SCMNodeMetric secondNodeMetric =
+          nodeManager.getNodeStat(secondNodeDetails);
+      datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get())
+          ? firstNodeDetails : secondNodeDetails;
+    }
+    // the pick is decided and it should be removed from candidates.
+    healthyNodes.remove(datanodeDetails);
+    return datanodeDetails;
+  }
+
+  /**
+   * Choose node on different racks as anchor is on based on rack awareness.
+   * If a node on different racks cannot be found, then return a random node.
+   * @param healthyNodes healthy nodes
+   * @param excludedNodes excluded nodes
+   * @param networkTopology network topology
+   * @param anchor anchor node
+   * @return a node on different rack
+   */
+  @VisibleForTesting
+  protected DatanodeDetails chooseNodeBasedOnRackAwareness(
+      List<DatanodeDetails> healthyNodes,  List<DatanodeDetails> excludedNodes,
+      NetworkTopology networkTopology, DatanodeDetails anchor) {
+    Preconditions.checkArgument(networkTopology != null);
+    if (checkAllNodesAreEqual(networkTopology)) {
+      return null;
+    }
+
+    for (DatanodeDetails node : healthyNodes) {
+      if (excludedNodes.contains(node)
+          || networkTopology.isSameParent(anchor, node)) {
+        continue;
+      } else {
+        // the pick is decided and it should be removed from candidates.
+        healthyNodes.remove(node);
+        return node;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Check if all nodes are equal in topology.
+   * They are equal when network topology is absent or there are on
+   * the same rack.
+   * @param topology network topology
+   * @return true when all nodes are equal
+   */
+  private boolean checkAllNodesAreEqual(NetworkTopology topology) {
+    if (topology == null) {
+      return true;
+    }
+    return (topology.getNumOfNodes(topology.getMaxLevel() - 1) == 1);
+  }
+
+  /**
+   * Choose node based on network topology.
+   * @param networkTopology network topology
+   * @param anchor anchor datanode to start with
+   * @param excludedNodes excluded datanodes
+   * @return chosen datanode
+   */
+  @VisibleForTesting
+  protected DatanodeDetails chooseNodeFromNetworkTopology(
+      NetworkTopology networkTopology, DatanodeDetails anchor,
+      List<DatanodeDetails> excludedNodes) {
+    Preconditions.checkArgument(networkTopology != null);
+
+    Collection<Node> excluded = new ArrayList<>();
+    if (excludedNodes != null && excludedNodes.size() != 0) {
+      excluded.addAll(excludedNodes);
+    }
+    excluded.add(anchor);
+
+    Node pick = networkTopology.chooseRandom(
+        anchor.getNetworkLocation(), excluded);
+    DatanodeDetails pickedNode = (DatanodeDetails) pick;
+    // exclude the picked node for next time
+    if (excludedNodes != null) {
+      excludedNodes.add(pickedNode);
+    }
+    return pickedNode;
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 87cc177..613146d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -16,11 +16,13 @@
  */
 package org.apache.hadoop.hdds.scm.container;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.net.NetConstants;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
 import org.apache.hadoop.hdds.scm.net.Node;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -86,7 +88,7 @@ public class MockNodeManager implements NodeManager {
   private final SCMNodeStat aggregateStat;
   private boolean safemode;
   private final Map<UUID, List<SCMCommand>> commandMap;
-  private final Node2PipelineMap node2PipelineMap;
+  private Node2PipelineMap node2PipelineMap;
   private final Node2ContainerMap node2ContainerMap;
   private NetworkTopology clusterMap;
   private ConcurrentMap<String, Set<String>> dnsToUuidMap;
@@ -100,6 +102,7 @@ public class MockNodeManager implements NodeManager {
     this.node2ContainerMap = new Node2ContainerMap();
     this.dnsToUuidMap = new ConcurrentHashMap<>();
     aggregateStat = new SCMNodeStat();
+    clusterMap = new NetworkTopologyImpl(new Configuration());
     if (initializeFakeNodes) {
       for (int x = 0; x < nodeCount; x++) {
         DatanodeDetails dd = TestUtils.randomDatanodeDetails();
@@ -251,6 +254,16 @@ public class MockNodeManager implements NodeManager {
   }
 
   /**
+   * Get the count of pipelines a datanodes is associated with.
+   * @param datanodeDetails DatanodeDetails
+   * @return The number of pipelines
+   */
+  @Override
+  public int getPipelinesCount(DatanodeDetails datanodeDetails) {
+    return node2PipelineMap.getPipelinesCount(datanodeDetails.getUuid());
+  }
+
+  /**
    * Add pipeline information in the NodeManager.
    * @param pipeline - Pipeline to be added
    */
@@ -260,6 +273,22 @@ public class MockNodeManager implements NodeManager {
   }
 
   /**
+   * Get the entire Node2PipelineMap.
+   * @return Node2PipelineMap
+   */
+  public Node2PipelineMap getNode2PipelineMap() {
+    return node2PipelineMap;
+  }
+
+  /**
+   * Set the Node2PipelineMap.
+   * @param node2PipelineMap Node2PipelineMap
+   */
+  public void setNode2PipelineMap(Node2PipelineMap node2PipelineMap) {
+    this.node2PipelineMap = node2PipelineMap;
+  }
+
+  /**
    * Remove a pipeline information from the NodeManager.
    * @param pipeline - Pipeline to be removed
    */
@@ -517,6 +546,11 @@ public class MockNodeManager implements NodeManager {
     return results;
   }
 
+  @Override
+  public NetworkTopology getClusterNetworkTopologyMap() {
+    return clusterMap;
+  }
+
   public void setNetworkTopology(NetworkTopology topology) {
     this.clusterMap = topology;
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
new file mode 100644
index 0000000..2e0d0b1
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.*;
+import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * Test for PipelinePlacementPolicy.
+ */
+public class TestPipelinePlacementPolicy {
+  private MockNodeManager nodeManager;
+  private PipelinePlacementPolicy placementPolicy;
+  private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
+
+  @Before
+  public void init() throws Exception {
+    nodeManager = new MockNodeManager(true,
+        PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+    placementPolicy =
+        new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration());
+  }
+
+  @Test
+  public void testChooseNodeBasedOnNetworkTopology() {
+    List<DatanodeDetails> healthyNodes =
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
+    // anchor should be removed from healthyNodes after being chosen.
+    Assert.assertFalse(healthyNodes.contains(anchor));
+
+    List<DatanodeDetails> excludedNodes =
+        new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+    DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology(
+        nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes);
+    // excludedNodes should contain nextNode after being chosen.
+    Assert.assertTrue(excludedNodes.contains(nextNode));
+    // nextNode should not be the same as anchor.
+    Assert.assertTrue(anchor.getUuid() != nextNode.getUuid());
+  }
+
+  @Test
+  public void testChooseNodeBasedOnRackAwareness() {
+    List<DatanodeDetails> healthyNodes = overWriteLocationInNodes(
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY));
+    DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
+    NetworkTopology topologyWithDifRacks =
+        createNetworkTopologyOnDifRacks();
+    DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
+        healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+        topologyWithDifRacks, anchor);
+    Assert.assertFalse(topologyWithDifRacks.isSameParent(anchor, nextNode));
+  }
+
+  private final static Node[] NODES = new NodeImpl[] {
+      new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h3", "/r1", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h4", "/r1", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h5", "/r2", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h6", "/r2", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h7", "/r2", NetConstants.NODE_COST_DEFAULT),
+      new NodeImpl("h8", "/r2", NetConstants.NODE_COST_DEFAULT),
+  };
+
+
+  private NetworkTopology createNetworkTopologyOnDifRacks() {
+    NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
+    for (Node n : NODES) {
+      topology.add(n);
+    }
+    return topology;
+  }
+
+  private List<DatanodeDetails> overWriteLocationInNodes(
+      List<DatanodeDetails> datanodes) {
+    List<DatanodeDetails> results = new ArrayList<>(datanodes.size());
+    for (int i = 0; i < datanodes.size(); i++) {
+      DatanodeDetails datanode = datanodes.get(i);
+      DatanodeDetails result = DatanodeDetails.newBuilder()
+          .setUuid(datanode.getUuidString())
+          .setHostName(datanode.getHostName())
+          .setIpAddress(datanode.getIpAddress())
+          .addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE))
+          .addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS))
+          .addPort(datanode.getPort(DatanodeDetails.Port.Name.REST))
+          .setNetworkLocation(NODES[i].getNetworkLocation()).build();
+      results.add(result);
+    }
+    return results;
+  }
+
+  @Test
+  public void testHeavyNodeShouldBeExcluded() throws SCMException{
+    List<DatanodeDetails> healthyNodes =
+        nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+    int nodesRequired = healthyNodes.size()/2;
+    // only minority of healthy NODES are heavily engaged in pipelines.
+    int minorityHeavy = healthyNodes.size()/2 - 1;
+    List<DatanodeDetails> pickedNodes1 = placementPolicy.chooseDatanodes(
+        new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+        new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+        nodesRequired, 0);
+    // modify node to pipeline mapping.
+    insertHeavyNodesIntoNodeManager(healthyNodes, minorityHeavy);
+    // NODES should be sufficient.
+    Assert.assertEquals(nodesRequired, pickedNodes1.size());
+    // make sure pipeline placement policy won't select duplicated NODES.
+    Assert.assertTrue(checkDuplicateNodesUUID(pickedNodes1));
+
+    // majority of healthy NODES are heavily engaged in pipelines.
+    int majorityHeavy = healthyNodes.size()/2 + 2;
+    insertHeavyNodesIntoNodeManager(healthyNodes, majorityHeavy);
+    boolean thrown = false;
+    List<DatanodeDetails> pickedNodes2 = null;
+    try {
+      pickedNodes2 = placementPolicy.chooseDatanodes(
+          new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+          new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+          nodesRequired, 0);
+    } catch (SCMException e) {
+      Assert.assertFalse(thrown);
+      thrown = true;
+    }
+    // NODES should NOT be sufficient and exception should be thrown.
+    Assert.assertNull(pickedNodes2);
+    Assert.assertTrue(thrown);
+  }
+
+  private boolean checkDuplicateNodesUUID(List<DatanodeDetails> nodes) {
+    HashSet<UUID> uuids = nodes.stream().
+        map(DatanodeDetails::getUuid).
+        collect(Collectors.toCollection(HashSet::new));
+    return uuids.size() == nodes.size();
+  }
+
+  private Set<PipelineID> mockPipelineIDs(int count) {
+    Set<PipelineID> pipelineIDs = new HashSet<>(count);
+    for (int i = 0; i < count; i++) {
+      pipelineIDs.add(PipelineID.randomId());
+    }
+    return pipelineIDs;
+  }
+
+  private void insertHeavyNodesIntoNodeManager(
+      List<DatanodeDetails> nodes, int heavyNodeCount) throws SCMException{
+    if (nodes == null) {
+      throw new SCMException("",
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+
+    int considerHeavyCount =
+        ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1;
+
+    Node2PipelineMap mockMap = new Node2PipelineMap();
+    for (DatanodeDetails node : nodes) {
+      // mock heavy node
+      if (heavyNodeCount > 0) {
+        mockMap.insertNewDatanode(
+            node.getUuid(), mockPipelineIDs(considerHeavyCount));
+        heavyNodeCount--;
+      } else {
+        mockMap.insertNewDatanode(node.getUuid(), mockPipelineIDs(1));
+      }
+    }
+    nodeManager.setNode2PipelineMap(mockMap);
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 0ecff3f..7e8ec52 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -20,6 +20,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -166,6 +167,16 @@ public class ReplicationNodeManagerMock implements NodeManager {
   }
 
   /**
+   * Get the count of pipelines a datanodes is associated with.
+   * @param dnId DatanodeDetails
+   * @return The number of pipelines
+   */
+  @Override
+  public int getPipelinesCount(DatanodeDetails dnId) {
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  /**
    * Add pipeline information in the NodeManager.
    * @param pipeline - Pipeline to be added
    */
@@ -327,4 +338,9 @@ public class ReplicationNodeManagerMock implements NodeManager {
   public List<DatanodeDetails> getNodesByAddress(String address) {
     return new LinkedList<>();
   }
+
+  @Override
+  public NetworkTopology getClusterNetworkTopologyMap() {
+    return null;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


[hadoop-ozone] 03/04: HDDS-2089: Add createPipeline CLI. (#1418)

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 2b9ad2df38308525d94daf2d3d5ed417b01c9c18
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Fri Sep 13 07:01:16 2019 +0800

    HDDS-2089: Add createPipeline CLI. (#1418)
    
    (cherry picked from commit 326b5acd4a63fe46821919322867f5daff30750c)
---
 .../org/apache/hadoop/ozone/audit/SCMAction.java   |  1 +
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |  2 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  8 +--
 .../scm/cli/pipeline/CreatePipelineSubcommand.java | 71 ++++++++++++++++++++++
 .../hdds/scm/cli/pipeline/PipelineCommands.java    |  1 +
 5 files changed, 78 insertions(+), 5 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
index c3e9440..fada2d8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/audit/SCMAction.java
@@ -31,6 +31,7 @@ public enum SCMAction implements AuditAction {
   GET_CONTAINER,
   GET_CONTAINER_WITH_PIPELINE,
   LIST_CONTAINER,
+  CREATE_PIPELINE,
   LIST_PIPELINE,
   CLOSE_PIPELINE,
   ACTIVATE_PIPELINE,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index ab98dfa..54e2141 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -48,7 +48,7 @@ public class SimplePipelineProvider implements PipelineProvider {
       String e = String
           .format("Cannot create pipeline of factor %d using %d nodes.",
               factor.getNumber(), dns.size());
-      throw new IOException(e);
+      throw new InsufficientDatanodesException(e);
     }
 
     Collections.shuffle(dns);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index 80a6a07..efe9a55 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -396,10 +396,10 @@ public class SCMClientProtocolServer implements
   public Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
       HddsProtos.ReplicationFactor factor, HddsProtos.NodePool nodePool)
       throws IOException {
-    // TODO: will be addressed in future patch.
-    // This is needed only for debugging purposes to make sure cluster is
-    // working correctly.
-    return null;
+    Pipeline result = scm.getPipelineManager().createPipeline(type, factor);
+    AUDIT.logWriteSuccess(
+        buildAuditMessageForSuccess(SCMAction.CREATE_PIPELINE, null));
+    return result;
   }
 
   @Override
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/CreatePipelineSubcommand.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/CreatePipelineSubcommand.java
new file mode 100644
index 0000000..edeb786
--- /dev/null
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/CreatePipelineSubcommand.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.cli.pipeline;
+
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.cli.SCMCLI;
+import org.apache.hadoop.hdds.scm.client.ScmClient;
+import picocli.CommandLine;
+
+import java.util.concurrent.Callable;
+
+/**
+ * Handler of createPipeline command.
+ */
+@CommandLine.Command(
+    name = "createPipeline",
+    description = "create pipeline",
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+public class CreatePipelineSubcommand implements Callable<Void> {
+  @CommandLine.ParentCommand
+  private SCMCLI parent;
+
+  @CommandLine.Option(
+      names = {"-t", "--replicationType"},
+      description = "Replication type (STAND_ALONE, RATIS)",
+      defaultValue = "STAND_ALONE"
+  )
+  private HddsProtos.ReplicationType type
+      = HddsProtos.ReplicationType.STAND_ALONE;
+
+  @CommandLine.Option(
+      names = {"-f", "--replicationFactor"},
+      description = "Replication factor (ONE, THREE)",
+      defaultValue = "ONE"
+  )
+  private HddsProtos.ReplicationFactor factor
+      = HddsProtos.ReplicationFactor.ONE;
+
+  @Override
+  public Void call() throws Exception {
+    if (type == HddsProtos.ReplicationType.CHAINED) {
+      throw new IllegalArgumentException(type.name()
+          + " is not supported yet.");
+    }
+    try (ScmClient scmClient = parent.createScmClient()) {
+      scmClient.createReplicationPipeline(
+          type,
+          factor,
+          HddsProtos.NodePool.getDefaultInstance());
+      return null;
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/PipelineCommands.java b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/PipelineCommands.java
index 948a51a..0bdbc19 100644
--- a/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/PipelineCommands.java
+++ b/hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/PipelineCommands.java
@@ -37,6 +37,7 @@ import java.util.concurrent.Callable;
         ListPipelinesSubcommand.class,
         ActivatePipelineSubcommand.class,
         DeactivatePipelineSubcommand.class,
+        CreatePipelineSubcommand.class,
         ClosePipelineSubcommand.class
     })
 public class PipelineCommands implements Callable<Void> {


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org