You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sa...@apache.org on 2019/10/15 07:58:46 UTC

[hadoop-ozone] branch HDDS-2034 updated (11aa210 -> d13f960)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a change to branch HDDS-2034
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git.


 discard 11aa210  Update per comments
 discard 4fd969d  fix checkstyle issues
 discard f7b9ad3  HDDS-2034. sync RATIS pipeline creation and destroy through heartbeat commands.
    omit a9b88c3  HDDS-2089: Add createPipeline CLI. (#1418)
    omit 06492fa  HDDS-1571. Create an interface for pipeline placement policy to support network topologies. (#1395)
    omit ebbf681  HDDS-1577. Add default pipeline placement policy implementation. (#1366)
     add 085af1a  HDDS-2161. Create RepeatedKeyInfo structure to be saved in deletedTable
     add 2078440  HDDS-2160. Add acceptance test for ozonesecure-mr compose. Contributed by Xiaoyu Yao. (#1490)
     add 98fc3f1  HDDS-2159. Fix Race condition in ProfileServlet#pid.
     add f07b133  HDDS-2081. Fix TestRatisPipelineProvider#testCreatePipelinesDnExclude. Contributed by Aravindan Vijayan. (#1506)
     add 1da6dd0  HDDS-2167. Hadoop31-mr acceptance test is failing due to the shading
     add bd1b9e5  HDDS-2170. Add Object IDs and Update ID to Volume Object (#1510)
     add 06661b8  HDDS-2172.Ozone shell should remove description about REST protocol support. Contributed by Siddharth Wagle.
     add 8bfbc71  HDDS-2168. TestOzoneManagerDoubleBufferWithOMResponse sometimes fails with out of memory error (#1509)
     add ea36385  HDDS-2171. Dangling links in test report due to incompatible realpath
     add 17cbae1  HDDS-1738. Add nullable annotation for OMResponse classes
     add 191d4f2  HDDS-2165. Freon fails if bucket does not exists
     add 84c03a7  HDDS-2067. Create generic service facade with tracing/metrics/logging support
     add adec66f  HDDS-2182. Fix checkstyle violations introduced by HDDS-1738
     add f7d9fbd  HDDS-2180. Add Object ID and update ID on VolumeList Object. (#1526)
     add 861fae2  HDDS-2174. Delete GDPR Encryption Key from metadata when a Key is deleted
     add 4eafc11  HDD-2193. Adding container related metrics in SCM.
     add 5f6db5d  HDDS-2179. ConfigFileGenerator fails with Java 10 or newer
     add abdd2bd  HDDS-2149. Replace findbugs with spotbugs
     add 1a93d9d  HDDS-2185. createmrenv failure not reflected in acceptance test result
     add 90d7cc4  HDDS-1146. Adding container related metrics in SCM. (#1541)
     add 57dfd7c  HDDS-2183. Container and pipline subcommands of scmcli should be grouped
     add 1f7d8bf  HDDS-2153. Add a config to tune max pending requests in Ratis leader
     add b889902  HDDS-2202. Remove unused import in OmUtils
     add 737546b  HDDS-2207. Update Ratis to latest snapshot. Contributed by Shashikant Bannerjee. (#1550)
     add ffd2f8b  HDDS-2205. checkstyle.sh reports wrong failure count
     add 3582431  HDDS-1615. ManagedChannel references are being leaked in ReplicationSupervisor.java. Contributed by  Mukul Kumar Singh. (#1547)
     add 6062a1a  HDDS-2166. Some RPC metrics are missing from SCM prometheus endpoint
     add 911f8b8  HDDS-2210. ContainerStateMachine should not be marked unhealthy if applyTransaction fails with closed container exception(#1552).
     add 56d2873  HDDS-2187. ozone-mr test fails with No FileSystem for scheme "o3fs"
     add bdb3081  HDDS-2201. Rename VolumeList to UserVolumeInfo. (#1566)
     add 0334f45  HDDS-2068. Make StorageContainerDatanodeProtocolService message based
     add cf69043  HDDS-2073. Make SCMSecurityProtocol message based. Contributed by Elek, Marton.
     add 3d013c5  HDDS-2227. GDPR key generation could benefit from secureRandom. (#1574)
     add 3a7bbdd  HDDS-2162. Make OM Generic related configuration support HA style config. (#1511)
     add 1754b32  HDDS-2224. Fix loadup cache for cache cleanup policy NEVER. (#1567)
     add bd304bc  HDDS-2019. Handle Set DtService of token in S3Gateway for OM HA. (#1489)
     add 2be51cf  HDDS-2072. Make StorageContainerLocationProtocolService message based Contributed by Elek, Marton.
     add 67100ff  HDDS-2228. Fix NPE in OzoneDelegationTokenManager#addPersistedDelegat… (#1571)
     add c8d3ca1  HDDS-2226. S3 Secrets should use a strong RNG. (#1572)
     add 8560edf  HDDS-2211. Collect docker logs if env fails to start (#1553)
     add 2aace9e  HDDS-2234. rat.sh fails due to ozone-recon-web/build files (#1580)
     add 5249f99  HDDS-2231. test-single.sh cannot copy results (#1575)
     add 751c8d0  HDDS-1720 : Add ability to configure RocksDB logs for Ozone Manager.
     add 86ce58d  HDDS-2200 : Recon does not handle the NULL snapshot from OM DB cleanly.
     add 1135b7c  HDDS-2198. SCM should not consider containers in CLOSING state to come out of safemode. (#1540)
     add 2cd1864  HDDS-2223. Support ReadWrite lock in LockManager. (#1564)
     add 7fbee75  HDDS-2225. SCM fails to start in most unsecure environments due to leftover secure config
     add 80ac600  HDDS-2222 (#1578)
     add 1534e35  Revert "HDDS-2222 (#1578)" (#1594)
     add a2ebf5a  HDDS-2230. Invalid entries in ozonesecure-mr config
     add 55f9233  HDDS-2140. Add robot test for GDPR feature
     add 49ed516  HDDS-2199. In SCMNodeManager dnsToUuidMap cannot track multiple DNs on the same host
     add 5119799  HDDS-2216. Rename HADOOP_RUNNER_VERSION to OZONE_RUNNER_VERSION in compose .env files.
     add a6b8b16  HDDS-2222. Add a method to update ByteBuffer in PureJavaCrc32/PureJavaCrc32C. (#1595)
     add 6934402  HDDS-2230. Invalid entries in ozonesecure-mr config. (Addendum)
     add b6461c3  HDDS-2237. KeyDeletingService throws NPE if it's started too early (#1584)
     add c7232bb  HDDS-2164 : om.db.checkpoints is getting filling up fast. (#1536)
     add e87d6df  HDDS-2158. Fixing Json Injection Issue in JsonUtils. (#1486)
     add 5583014  HDDS-2250. Generated configs missing from ozone-filesystem-lib jars
     add 37bc8b0  HDDS-2257. Fix checkstyle issues in ChecksumByteBuffer (#1603)
     add 4d285c6  HDDS-2251. Add an option to customize unit.sh and integration.sh parameters
     add e70ea7b  HDDS-2169. Avoid buffer copies while submitting client requests in Ratis. Contributed by Tsz-wo Sze(#1517).
     add ed47ffe  HDDS-2252. Enable gdpr robot test in daily build
     add 83a5715  HDDS-2239. Fix TestOzoneFsHAUrls (#1600)
     add 719240c  HDDS-2238. Container Data Scrubber spams log in empty cluster
     add dd2bafa  HDDS-2264. Improve output of TestOzoneContainer
     add 796c057  HDDS-2259. Container Data Scrubber computes wrong checksum
     add a7fdbf2  HDDS-2262. SLEEP_SECONDS: command not found
     add e1eb29d  HDDS-2245. Use dynamic ports for SCM in TestSecureOzoneCluster Contributed by kevin su.
     add 6a580fd  HDDS-2260. Avoid evaluation of LOG.trace and LOG.debug statement in the read/write path (HDDS). (#1612)
     add 70cf448  HDDS-2244. Use new ReadWrite lock in OzoneManager. (#1589)
     add 8ec4ba2  HDDS-2233 - Remove ByteStringHelper and refactor the code to the place where it used (#1596)
     add a15782a  HDDS-2217. Remove log4j and audit configuration from the docker-config files
     add 3d5e915  HDDS-2217. Remove log4j and audit configuration from the docker-config files
     add 71cc36f  Squashed commit of the following:
     add 28e6203  HDDS-2265. integration.sh may report false negative
     add 253faed  HDDS-2266. Avoid evaluation of LOG.trace and LOG.debug statement in the read/write path. (#1633)
     add 9ff77cd  HDDS-2269. Provide config for fair/non-fair for OM RW Lock. (#1623)
     add f89eda1  HDDS-1984. Fix listBucket API. (#1555)
     add 640255a  HDDS-1986. Fix listkeys API. (#1588)
     add 2862cdd  HDDS-2282. scmcli pipeline list command throws NullPointerException. Contributed by Xiaoyu Yao. (#1642)
     add 21ef389  HDDS-2213.Reduce key provider loading log level in OzoneFileSystem#getAdditionalTokenIssuers (#1556)
     add c8f14a5  HDDS-2287. Import common utility scripts and txt files from Hadoop without history.
     add e722e4d  HDDS-2290. Rename pom.ozone.xml to pom.xml
     add f59d359  HDDS-2298. Fix maven warning about duplicated metrics-core jar
     add 920dde9  HDDS-2298. Fix maven warning about duplicated metrics-core jar.
     add e227ba4  HDDS-2220. HddsVolume needs a toString method.
     add 83b5a67  HDDS-2220. HddsVolume needs a toString method.
     add b3322b5  HDDS-2204. Avoid buffer coping in checksum verification. Contributed by Tsz Wo Nicholas Sze.
     new caefc4f  HDDS-2034. Async RATIS pipeline creation and destroy through datanode heartbeat commands.
     new d13f960  Fix checkstyle and code improvement

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (11aa210)
            \
             N -- N -- N   refs/heads/HDDS-2034 (d13f960)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 BUILDING.txt                                       | 511 +++++++++++++++++++
 .../main/license/src/LICENSE.txt => LICENSE.txt    |  53 +-
 .../src/main/license/src/NOTICE.txt => NOTICE.txt  |   5 +-
 README.txt                                         |   7 +
 .../hadoop-ozone.sh => dev-support/bin/qbt         |   7 +-
 .../bin/smart-apply-patch                          |   7 +-
 .../hadoop-ozone.sh => dev-support/bin/test-patch  |   7 +-
 dev-support/bin/yetus-wrapper                      | 188 +++++++
 .../framework => dev-support/byteman}/README.md    |  17 +-
 dev-support/byteman/hadooprpc.btm                  |  44 ++
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |  17 +-
 .../hadoop/hdds/scm/XceiverClientManager.java      |  11 +-
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |  67 +--
 .../hdds/scm/client/ContainerOperationClient.java  |  10 +-
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |   6 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |  43 +-
 .../apache/hadoop/hdds/scm/storage/BufferPool.java |  15 +
 .../hadoop/hdds/scm/storage/CommitWatcher.java     |   8 +-
 .../common/dev-support/findbugsExcludeFile.xml     |   5 +
 hadoop-hdds/common/pom.xml                         |   4 +-
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |   6 +-
 .../apache/hadoop/hdds/conf/HddsConfServlet.java   |   4 +-
 .../function/FunctionWithServiceException.java     |  21 +-
 .../hdds/{cli => function}/package-info.java       |   4 +-
 .../SCMSecurityProtocolClientSideTranslatorPB.java | 104 ++--
 .../SCMSecurityProtocolServerSideTranslatorPB.java | 132 -----
 .../hdds/ratis/ContainerCommandRequestMessage.java | 107 ++++
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |  17 +-
 .../hadoop/hdds/scm/ByteStringConversion.java      |  62 +++
 .../apache/hadoop/hdds/scm/ByteStringHelper.java   |  69 ---
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  10 +-
 .../hadoop/hdds/scm/container/ContainerInfo.java   |   2 +-
 .../algorithms/ContainerPlacementPolicy.java}      |  12 +-
 .../placement/algorithms}/package-info.java        |   8 +-
 .../hadoop/hdds/scm/net/NetworkTopologyImpl.java   |  21 +-
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |  17 +-
 ...inerLocationProtocolClientSideTranslatorPB.java | 411 ++++++++-------
 .../hdds/scm/storage/ContainerProtocolCalls.java   |   2 +-
 .../hdds/security/token/BlockTokenVerifier.java    |   4 +-
 .../security/token/OzoneBlockTokenSelector.java    |   8 +-
 .../authority/PKIProfiles/DefaultProfile.java      |   4 +-
 .../hdds/security/x509/keys/HDDSKeyGenerator.java  |   6 +-
 .../apache/hadoop/hdds/tracing/StringCodec.java    |   4 +-
 .../hadoop/hdds/utils/BackgroundService.java       |   9 +-
 .../apache/hadoop/hdds/utils/HddsVersionInfo.java  |   6 +-
 .../org/apache/hadoop/hdds/utils/LevelDBStore.java |  20 +-
 .../hadoop/hdds/utils/db/DBStoreBuilder.java       |  24 +-
 .../hadoop/hdds/utils/db/RDBCheckpointManager.java |   2 +-
 .../hadoop/hdds/utils/db/RocksDBCheckpoint.java    |   3 +-
 .../hdds/utils/db/RocksDBConfiguration.java}       |  48 +-
 .../apache/hadoop/hdds/utils/db/TypedTable.java    |   2 +-
 .../hadoop/hdds/utils/db/cache/CacheKey.java       |  11 +-
 .../hadoop/hdds/utils/db/cache/TableCache.java     |   9 +
 .../hadoop/hdds/utils/db/cache/TableCacheImpl.java |  19 +-
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   8 +
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   2 +
 .../org/apache/hadoop/ozone/audit/SCMAction.java   |   1 -
 .../org/apache/hadoop/ozone/common/Checksum.java   | 218 ++++----
 .../hadoop/ozone/common/ChecksumByteBuffer.java    | 122 +++++
 .../apache/hadoop/ozone/common/ChecksumData.java   |   4 +-
 .../ozone/common/PureJavaCrc32ByteBuffer.java      | 556 ++++++++++++++++++++
 .../ozone/common/PureJavaCrc32CByteBuffer.java     | 559 +++++++++++++++++++++
 .../helpers/ContainerCommandRequestPBHelper.java   |  16 +-
 .../hadoop/ozone/lease/LeaseCallbackExecutor.java  |   2 +-
 .../apache/hadoop/ozone/lease/LeaseManager.java    |   6 +-
 .../org/apache/hadoop/ozone/lock/ActiveLock.java   |  72 ++-
 .../org/apache/hadoop/ozone/lock/LockManager.java  | 183 ++++++-
 .../hadoop/ozone/lock/PooledLockFactory.java       |   7 +-
 ...inerLocationProtocolServerSideTranslatorPB.java | 395 ---------------
 .../apache/hadoop/ozone/web/utils/JsonUtils.java   |   5 +-
 .../src/main/proto/SCMSecurityProtocol.proto       |  96 ++--
 .../src/main/proto/ScmBlockLocationProtocol.proto  |   2 +-
 .../proto/StorageContainerLocationProtocol.proto   | 185 ++++---
 .../common/src/main/resources/ozone-default.xml    |  32 +-
 .../ratis/TestContainerCommandRequestMessage.java  | 152 ++++++
 .../hadoop/hdds/utils/db/TestDBStoreBuilder.java   |  16 +-
 .../apache/hadoop/ozone/common/TestChecksum.java   |   6 +-
 .../ozone/common/TestChecksumByteBuffer.java       | 102 ++++
 .../apache/hadoop/ozone/lock/TestLockManager.java  | 145 +++++-
 .../hadoop/hdds/conf/ConfigFileGenerator.java      |   5 +-
 hadoop-hdds/container-service/pom.xml              |   9 +-
 .../ozone/container/common/impl/ContainerSet.java  |  64 ++-
 .../container/common/impl/HddsDispatcher.java      |  11 +-
 .../RandomContainerDeletionChoosingPolicy.java     |  11 +-
 ...TopNOrderedContainerDeletionChoosingPolicy.java |  13 +-
 .../common/report/PipelineReportPublisher.java     |   1 -
 .../common/statemachine/DatanodeStateMachine.java  |   8 -
 .../common/statemachine/EndpointStateMachine.java  |   4 +-
 .../CloseContainerCommandHandler.java              |   6 +-
 .../commandhandler/DeleteBlocksCommandHandler.java |  28 +-
 .../server/ratis/ContainerStateMachine.java        |  72 ++-
 .../transport/server/ratis/XceiverServerRatis.java |  13 +-
 .../ozone/container/common/volume/HddsVolume.java  |   8 +
 .../container/common/volume/HddsVolumeChecker.java |  14 +-
 .../common/volume/ThrottledAsyncChecker.java       |   8 +-
 .../container/keyvalue/KeyValueBlockIterator.java  |   6 +-
 .../container/keyvalue/KeyValueContainerCheck.java |  34 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  | 117 +++--
 .../container/keyvalue/helpers/ChunkUtils.java     |  42 +-
 .../container/keyvalue/impl/BlockManagerImpl.java  |   8 +-
 .../keyvalue/impl/ChunkManagerDummyImpl.java       |   6 +-
 .../container/keyvalue/impl/ChunkManagerImpl.java  |  69 ++-
 .../keyvalue/interfaces/ChunkManager.java          |   2 +-
 .../background/BlockDeletingService.java           |   6 +-
 .../container/ozoneimpl/ContainerController.java   |   6 +-
 .../container/ozoneimpl/ContainerDataScanner.java  |  50 +-
 .../ozoneimpl/ContainerDataScrubberMetrics.java    |   4 +-
 .../ozoneimpl/ContainerMetadataScanner.java        |  19 +-
 .../ContainerMetadataScrubberMetrics.java          |   5 +-
 .../ozoneimpl/ContainerScrubberConfiguration.java  |  17 +
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   8 +-
 .../replication/GrpcReplicationClient.java         |   6 +
 ...inerDatanodeProtocolClientSideTranslatorPB.java |  60 ++-
 ...inerDatanodeProtocolServerSideTranslatorPB.java | 115 +++--
 .../proto/StorageContainerDatanodeProtocol.proto   |  58 ++-
 .../ozone/container/common/SCMTestUtils.java       |   4 +-
 .../container/common/impl/TestContainerSet.java    |  18 +-
 .../container/keyvalue/TestChunkManagerImpl.java   |  69 +--
 .../keyvalue/TestKeyValueContainerCheck.java       |  69 ++-
 .../ozoneimpl/TestContainerScrubberMetrics.java    |  25 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |  23 +-
 hadoop-hdds/docs/content/beyond/Containers.md      |   2 +-
 hadoop-hdds/docs/content/start/FromSource.md       |   2 +-
 .../server/OzoneProtocolMessageDispatcher.java     |  88 ++++
 .../apache/hadoop/hdds/server/ProfileServlet.java  |  10 +-
 .../hadoop/hdds/server/PrometheusMetricsSink.java  |  16 +-
 .../hadoop/hdds/server/events/EventQueue.java      |   2 +-
 .../hdds/server/TestPrometheusMetricsSink.java     |  77 ++-
 hadoop-hdds/pom.xml                                |  18 +-
 hadoop-hdds/server-scm/pom.xml                     |   4 +-
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |  38 +-
 .../hdds/scm/block/SCMBlockDeletingService.java    |  12 +-
 .../scm/command/CommandStatusReportHandler.java    |  30 +-
 .../container/AbstractContainerReportHandler.java  |  14 +-
 .../scm/container/ContainerActionsHandler.java     |   6 +-
 .../hdds/scm/container/ContainerManager.java       |   8 +
 .../hdds/scm/container/ContainerReportHandler.java |   2 +
 .../hdds/scm/container/ContainerStateManager.java  |   5 +-
 .../IncrementalContainerReportHandler.java         |  16 +-
 .../hdds/scm/container/ReplicationManager.java     |  13 +-
 .../hdds/scm/container/SCMContainerManager.java    |  48 +-
 .../metrics/SCMContainerManagerMetrics.java        | 144 ++++++
 .../scm/{ => container/metrics}/package-info.java  |   4 +-
 .../ContainerPlacementPolicyFactory.java           |  12 +-
 .../placement/algorithms/SCMCommonPolicy.java}     |  23 +-
 .../algorithms/SCMContainerPlacementCapacity.java  |   4 +-
 .../algorithms/SCMContainerPlacementRackAware.java |  12 +-
 .../algorithms/SCMContainerPlacementRandom.java    |   6 +-
 .../scm/container/states/ContainerAttribute.java   |  22 +-
 .../scm/container/states/ContainerStateMap.java    |   6 +-
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |   2 -
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  22 +-
 .../hadoop/hdds/scm/node/NodeStateManager.java     |   9 -
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |  80 +--
 .../hdds/scm/node/states/Node2ObjectsMap.java      |   4 +-
 .../hdds/scm/node/states/Node2PipelineMap.java     |  12 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  11 +
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 346 -------------
 .../hdds/scm/pipeline/PipelineReportHandler.java   |   7 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |  21 +-
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  44 ++
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |   2 +-
 .../SCMSecurityProtocolServerSideTranslatorPB.java | 186 +++++++
 ...lockLocationProtocolServerSideTranslatorPB.java |  56 +--
 ...inerLocationProtocolServerSideTranslatorPB.java | 393 +++++++++++++++
 .../hadoop/hdds/scm/protocol}/package-info.java    |   8 +-
 .../hdds/scm/safemode/ContainerSafeModeRule.java   |  26 +-
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |   2 +-
 .../safemode/OneReplicaPipelineSafeModeRule.java   |   6 +-
 .../hdds/scm/safemode/SCMSafeModeManager.java      |  17 +
 .../hdds/scm/server/SCMBlockProtocolServer.java    |   9 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |  23 +-
 .../hdds/scm/server/SCMDatanodeProtocolServer.java | 114 ++---
 .../hdds/scm/server/SCMSecurityProtocolServer.java |  27 +-
 .../hdds/scm/server/StorageContainerManager.java   |  22 +-
 .../hadoop/hdds/scm/container/MockNodeManager.java |  72 ++-
 .../container/TestCloseContainerEventHandler.java  |   4 +-
 .../hdds/scm/container/TestReplicationManager.java |   7 +-
 .../scm/container/TestSCMContainerManager.java     |   2 +-
 .../algorithms/TestContainerPlacementFactory.java  |   7 +-
 .../hdds/scm/node/TestContainerPlacement.java      |   5 +-
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |  67 ++-
 .../scm/pipeline/TestPipelinePlacementPolicy.java  | 197 --------
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |   8 +-
 .../hdds/scm/safemode/TestSafeModeHandler.java     |   5 +-
 .../scm/server/TestSCMBlockProtocolServer.java     |   3 +-
 .../testutils/ReplicationNodeManagerMock.java      |  21 +-
 .../org/apache/hadoop/hdds/scm/cli/SCMCLI.java     |  45 +-
 .../hdds/scm/cli/container/CloseSubcommand.java    |   7 +-
 .../ContainerCommands.java}                        |  21 +-
 .../hdds/scm/cli/container/CreateSubcommand.java   |   5 +-
 .../hdds/scm/cli/container/DeleteSubcommand.java   |   7 +-
 .../hdds/scm/cli/container/InfoSubcommand.java     |   5 +-
 .../hdds/scm/cli/container/ListSubcommand.java     |   9 +-
 .../cli/pipeline/ActivatePipelineSubcommand.java   |  11 +-
 .../scm/cli/pipeline/ClosePipelineSubcommand.java  |  11 +-
 .../scm/cli/pipeline/CreatePipelineSubcommand.java |  71 ---
 .../cli/pipeline/DeactivatePipelineSubcommand.java |  11 +-
 .../scm/cli/pipeline/ListPipelinesSubcommand.java  |  11 +-
 .../PipelineCommands.java}                         |  20 +-
 .../client/io/BlockOutputStreamEntryPool.java      |  22 +-
 .../hadoop/ozone/client/io/KeyInputStream.java     |   6 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  18 +-
 hadoop-ozone/common/pom.xml                        |   4 +-
 .../main/java/org/apache/hadoop/ozone/OmUtils.java | 154 +++---
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   8 +-
 .../hadoop/ozone/om/S3SecretManagerImpl.java       |   4 +-
 .../ozone/om/codec/RepeatedOmKeyInfoCodec.java     |  52 ++
 ...lumeListCodec.java => UserVolumeInfoCodec.java} |  13 +-
 .../ozone/om/ha/OMFailoverProxyProvider.java       |   6 +-
 .../hadoop/ozone/om/helpers/OMRatisHelper.java     |   4 +-
 .../hadoop/ozone/om/helpers/OmVolumeArgs.java      | 100 +++-
 .../hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java |  91 ++++
 .../hadoop/ozone/om/lock/OzoneManagerLock.java     | 139 ++++-
 .../hadoop/ozone/security/GDPRSymmetricKey.java    |   8 +-
 .../security/OzoneBlockTokenSecretManager.java     |   2 +-
 .../OzoneDelegationTokenSecretManager.java         |  11 +-
 .../security/OzoneDelegationTokenSelector.java     |   8 +-
 .../hadoop/ozone/security/OzoneSecretManager.java  |  13 +-
 .../src/main/proto/OzoneManagerProtocol.proto      |  18 +-
 .../java/org/apache/hadoop/ozone/TestOmUtils.java  |  79 ++-
 .../ozone/security/TestGDPRSymmetricKey.java       |   4 +-
 hadoop-ozone/csi/pom.xml                           |   4 +-
 .../dev-support/checks/_mvn_unit_report.sh         |  21 +-
 hadoop-ozone/dev-support/checks/blockade.sh        |   2 +-
 hadoop-ozone/dev-support/checks/build.sh           |   2 +-
 hadoop-ozone/dev-support/checks/checkstyle.sh      |   4 +-
 hadoop-ozone/dev-support/checks/findbugs.sh        |  12 +-
 hadoop-ozone/dev-support/checks/integration.sh     |   6 +-
 hadoop-ozone/dev-support/checks/unit.sh            |   2 +-
 .../dist/src/main/assemblies/ozone-src.xml         |   7 +-
 hadoop-ozone/dist/src/main/compose/ozone-hdfs/.env |   2 +-
 .../main/compose/ozone-hdfs/docker-compose.yaml    |   6 +-
 .../dist/src/main/compose/ozone-hdfs/docker-config |  46 --
 .../dist/src/main/compose/ozone-mr/common-config   |   9 -
 .../dist/src/main/compose/ozone-mr/hadoop27/.env   |   2 +-
 .../compose/ozone-mr/hadoop27/docker-compose.yaml  |   8 +-
 .../dist/src/main/compose/ozone-mr/hadoop31/.env   |   2 +-
 .../compose/ozone-mr/hadoop31/docker-compose.yaml  |   8 +-
 .../dist/src/main/compose/ozone-mr/hadoop32/.env   |   2 +-
 .../compose/ozone-mr/hadoop32/docker-compose.yaml  |   8 +-
 .../dist/src/main/compose/ozone-om-ha/.env         |   2 +-
 .../main/compose/ozone-om-ha/docker-compose.yaml   |  10 +-
 .../src/main/compose/ozone-om-ha/docker-config     |  45 --
 .../dist/src/main/compose/ozone-recon/.env         |   2 +-
 .../main/compose/ozone-recon/docker-compose.yaml   |   8 +-
 .../src/main/compose/ozone-recon/docker-config     |  47 +-
 .../dist/src/main/compose/ozone-topology/.env      |   2 +-
 .../compose/ozone-topology/docker-compose.yaml     |  12 +-
 .../src/main/compose/ozone-topology/docker-config  |  49 --
 hadoop-ozone/dist/src/main/compose/ozone/.env      |   2 +-
 .../src/main/compose/ozone/docker-compose.yaml     |   6 +-
 .../dist/src/main/compose/ozone/docker-config      |  45 --
 hadoop-ozone/dist/src/main/compose/ozone/test.sh   |   2 +
 .../dist/src/main/compose/ozoneblockade/.env       |   2 +-
 .../main/compose/ozoneblockade/docker-compose.yaml |   8 +-
 .../src/main/compose/ozoneblockade/docker-config   |  45 --
 hadoop-ozone/dist/src/main/compose/ozoneperf/.env  |   2 +-
 .../src/main/compose/ozoneperf/docker-compose.yaml |  10 +-
 .../dist/src/main/compose/ozoneperf/docker-config  |  13 -
 .../dist/src/main/compose/ozones3-haproxy/.env     |   2 +-
 .../compose/ozones3-haproxy/docker-compose.yaml    |  12 +-
 .../src/main/compose/ozones3-haproxy/docker-config |  48 --
 hadoop-ozone/dist/src/main/compose/ozones3/.env    |   2 +-
 .../src/main/compose/ozones3/docker-compose.yaml   |   8 +-
 .../dist/src/main/compose/ozones3/docker-config    |  48 --
 .../dist/src/main/compose/ozonescripts/.env        |   2 +-
 .../src/main/compose/ozonescripts/docker-config    |   7 +-
 .../dist/src/main/compose/ozonesecure-mr/.env      |   2 +-
 .../compose/ozonesecure-mr/docker-compose.yaml     |  50 +-
 .../src/main/compose/ozonesecure-mr/docker-config  |  77 +--
 .../{ozone-mr/hadoop32 => ozonesecure-mr}/test.sh  |  12 +-
 .../dist/src/main/compose/ozonesecure/.env         |   2 +-
 .../main/compose/ozonesecure/docker-compose.yaml   |  10 +-
 .../src/main/compose/ozonesecure/docker-config     |  53 --
 .../dist/src/main/compose/ozonesecure/test.sh      |   2 +
 hadoop-ozone/dist/src/main/compose/test-all.sh     |   4 +-
 hadoop-ozone/dist/src/main/compose/test-single.sh  |   2 +
 hadoop-ozone/dist/src/main/compose/testlib.sh      |  27 +-
 hadoop-ozone/dist/src/main/dockerbin/entrypoint.sh |   2 +-
 .../dist/src/main/smoketest/gdpr/gdpr.robot        |  89 ++++
 .../smoketest/{kinit.robot => kinit-hadoop.robot}  |   2 +-
 hadoop-ozone/dist/src/main/smoketest/kinit.robot   |   5 +-
 .../dist/src/main/smoketest/mapreduce.robot        |   2 +-
 .../{s3/__init__.robot => scmcli/pipeline.robot}   |  13 +-
 .../src/test/blockade/ozone/cluster.py             |   4 +-
 hadoop-ozone/insight/pom.xml                       |   9 +-
 .../ozone/insight/BaseInsightSubCommand.java       |   7 +-
 .../scm/ScmProtocolBlockLocationInsight.java       |   6 +-
 ...va => ScmProtocolContainerLocationInsight.java} |  18 +-
 ...nsight.java => ScmProtocolDatanodeInsight.java} |  27 +-
 ...nsight.java => ScmProtocolSecurityInsight.java} |  18 +-
 .../metrics/TestSCMContainerManagerMetrics.java    | 167 ++++++
 .../hdds/scm/pipeline/TestPipelineClose.java       |   4 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    |  26 +-
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |   3 +-
 .../hadoop/ozone/TestContainerOperations.java      |   4 +-
 .../TestContainerStateMachineIdempotency.java      |   5 +-
 .../hadoop/ozone/TestSecureOzoneCluster.java       |  13 +
 .../rpc/TestContainerStateMachineFailures.java     |  72 ++-
 .../client/rpc/TestOzoneRpcClientAbstract.java     |  76 ++-
 .../ozone/container/ContainerTestHelper.java       |  11 +-
 .../container/common/TestBlockDeletingService.java |  24 +-
 .../common/impl/TestContainerPersistence.java      |  53 +-
 .../hadoop/ozone/dn/scrubber/TestDataScrubber.java |  11 +-
 .../hadoop/ozone/om/TestOMDbCheckpointServlet.java |   4 -
 .../ozone/om/TestOzoneManagerConfiguration.java    |   9 +-
 .../ozone/om/TestOzoneManagerRocksDBLogging.java   |  97 ++++
 .../hadoop/ozone/scm/TestContainerSmallFile.java   |   4 +-
 .../scm/TestGetCommittedBlockLengthAndPutKey.java  |   5 +-
 hadoop-ozone/ozone-manager/pom.xml                 |   5 +-
 .../apache/hadoop/ozone/om/BucketManagerImpl.java  |  19 +-
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 106 ++--
 .../hadoop/ozone/om/OMDBCheckpointServlet.java     |  59 +--
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  10 -
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     | 238 ++++++---
 .../hadoop/ozone/om/OpenKeyCleanupService.java     |   4 +-
 .../org/apache/hadoop/ozone/om/OzoneManager.java   | 242 ++-------
 .../apache/hadoop/ozone/om/PrefixManagerImpl.java  |  11 +-
 .../apache/hadoop/ozone/om/VolumeManagerImpl.java  |  63 ++-
 .../apache/hadoop/ozone/om/ha/OMHANodeDetails.java | 306 +++++++++++
 .../hadoop/ozone/om/{ => ha}/OMNodeDetails.java    |   2 +-
 .../apache/hadoop/ozone/om/ha/package-info.java    |   4 +-
 .../ozone/om/ratis/OzoneManagerDoubleBuffer.java   |   8 +-
 .../ozone/om/ratis/OzoneManagerRatisClient.java    |  53 +-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |   8 +-
 .../om/request/bucket/OMBucketCreateRequest.java   |  12 +-
 .../om/request/bucket/OMBucketDeleteRequest.java   |  16 +-
 .../request/bucket/OMBucketSetPropertyRequest.java |  13 +-
 .../om/request/bucket/acl/OMBucketAclRequest.java  |   6 +-
 .../request/bucket/acl/OMBucketSetAclRequest.java  |   4 +-
 .../om/request/file/OMDirectoryCreateRequest.java  |   4 +-
 .../ozone/om/request/file/OMFileCreateRequest.java |   4 +-
 .../ozone/om/request/key/OMKeyCommitRequest.java   |   4 +-
 .../ozone/om/request/key/OMKeyCreateRequest.java   |   4 +-
 .../ozone/om/request/key/OMKeyDeleteRequest.java   |   7 +-
 .../ozone/om/request/key/OMKeyRenameRequest.java   |   4 +-
 .../ozone/om/request/key/acl/OMKeyAclRequest.java  |   6 +-
 .../request/key/acl/prefix/OMPrefixAclRequest.java |   4 +-
 .../request/s3/bucket/S3BucketCreateRequest.java   |  30 +-
 .../request/s3/bucket/S3BucketDeleteRequest.java   |  13 +-
 .../S3InitiateMultipartUploadRequest.java          |   4 +-
 .../multipart/S3MultipartUploadAbortRequest.java   |   9 +-
 .../S3MultipartUploadCommitPartRequest.java        |   8 +-
 .../S3MultipartUploadCompleteRequest.java          |   4 +-
 .../om/request/s3/security/S3GetSecretRequest.java |   6 +-
 .../om/request/volume/OMVolumeCreateRequest.java   |  22 +-
 .../om/request/volume/OMVolumeDeleteRequest.java   |  15 +-
 .../ozone/om/request/volume/OMVolumeRequest.java   |  32 +-
 .../om/request/volume/OMVolumeSetOwnerRequest.java |  16 +-
 .../om/request/volume/OMVolumeSetQuotaRequest.java |   6 +-
 .../om/request/volume/acl/OMVolumeAclRequest.java  |   4 +-
 .../request/volume/acl/OMVolumeSetAclRequest.java  |   6 +-
 .../om/response/bucket/OMBucketCreateResponse.java |   8 +-
 .../om/response/bucket/OMBucketDeleteResponse.java |   4 +-
 .../bucket/OMBucketSetPropertyResponse.java        |   7 +-
 .../response/file/OMDirectoryCreateResponse.java   |   3 +-
 .../om/response/file/OMFileCreateResponse.java     |   3 +-
 .../om/response/key/OMAllocateBlockResponse.java   |   6 +-
 .../ozone/om/response/key/OMKeyCommitResponse.java |   7 +-
 .../ozone/om/response/key/OMKeyCreateResponse.java |   3 +-
 .../ozone/om/response/key/OMKeyDeleteResponse.java |  32 +-
 .../ozone/om/response/key/OMKeyPurgeResponse.java  |   4 +-
 .../ozone/om/response/key/OMKeyRenameResponse.java |   6 +-
 .../multipart/S3MultipartUploadAbortResponse.java  |  21 +-
 .../S3MultipartUploadCommitPartResponse.java       |  45 +-
 .../S3MultipartUploadCompleteResponse.java         |   3 +-
 .../om/response/volume/OMVolumeAclOpResponse.java  |   3 +-
 .../om/response/volume/OMVolumeCreateResponse.java |  13 +-
 .../om/response/volume/OMVolumeDeleteResponse.java |  10 +-
 .../response/volume/OMVolumeSetOwnerResponse.java  |  12 +-
 .../response/volume/OMVolumeSetQuotaResponse.java  |   4 +-
 .../om/snapshot/OzoneManagerSnapshotProvider.java  |   2 +-
 .../OzoneManagerHARequestHandlerImpl.java          |   4 +-
 ...OzoneManagerProtocolServerSideTranslatorPB.java |  45 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |   4 +-
 .../ozone/security/acl/OzoneNativeAuthorizer.java  |   8 +-
 .../hadoop/ozone/web/ozShell/ObjectPrinter.java    |   3 +-
 .../org/apache/hadoop/ozone/web/ozShell/Shell.java |   3 +-
 .../web/ozShell/bucket/AddAclBucketHandler.java    |   5 +-
 .../web/ozShell/bucket/GetAclBucketHandler.java    |   4 +-
 .../web/ozShell/bucket/RemoveAclBucketHandler.java |   7 +-
 .../web/ozShell/bucket/SetAclBucketHandler.java    |   5 +-
 .../ozone/web/ozShell/keys/AddAclKeyHandler.java   |   5 +-
 .../ozone/web/ozShell/keys/GetAclKeyHandler.java   |   4 +-
 .../web/ozShell/keys/RemoveAclKeyHandler.java      |   7 +-
 .../ozone/web/ozShell/keys/SetAclKeyHandler.java   |   5 +-
 .../ozone/web/ozShell/token/GetTokenHandler.java   |   2 +-
 .../ozone/web/ozShell/token/PrintTokenHandler.java |   2 +-
 .../web/ozShell/volume/AddAclVolumeHandler.java    |   5 +-
 .../web/ozShell/volume/GetAclVolumeHandler.java    |   4 +-
 .../web/ozShell/volume/RemoveAclVolumeHandler.java |   7 +-
 .../web/ozShell/volume/SetAclVolumeHandler.java    |   5 +-
 .../hadoop/ozone/om/TestKeyDeletingService.java    |   3 +
 .../hadoop/ozone/om/TestOmMetadataManager.java     | 417 +++++++++++++++
 ...TestOzoneManagerDoubleBufferWithOMResponse.java |  21 +-
 .../om/ratis/TestOzoneManagerRatisServer.java      |   2 +-
 .../ozone/om/request/TestOMRequestUtils.java       |  85 +++-
 .../hadoop/ozone/om/request}/package-info.java     |   4 +-
 .../request/volume/TestOMVolumeCreateRequest.java  |  16 +-
 .../volume/TestOMVolumeSetOwnerRequest.java        |   4 +-
 .../ozone/om/response/TestOMResponseUtils.java     |   8 +-
 .../om/response/key/TestOMKeyDeleteResponse.java   |  20 +-
 .../hadoop/ozone/om/response}/package-info.java    |   4 +-
 .../s3/multipart/TestS3MultipartResponse.java      |   3 +-
 .../TestS3MultipartUploadAbortResponse.java        |  19 +-
 .../volume/TestOMVolumeCreateResponse.java         |   7 +-
 .../volume/TestOMVolumeDeleteResponse.java         |  11 +-
 .../volume/TestOMVolumeSetOwnerResponse.java       |  17 +-
 .../ozone/om/response/volume}/package-info.java    |   4 +-
 .../TestOzoneDelegationTokenSecretManager.java     |  29 +-
 hadoop-ozone/ozonefs-lib-current/pom.xml           |   7 +-
 hadoop-ozone/ozonefs-lib-legacy/pom.xml            |   4 +-
 hadoop-ozone/ozonefs/pom.xml                       |   9 +-
 .../fs/ozone/BasicOzoneClientAdapterImpl.java      |  59 +--
 .../hadoop/fs/ozone/BasicOzoneFileSystem.java      |   4 +-
 .../apache/hadoop/fs/ozone/OzoneFileSystem.java    |   2 +-
 hadoop-ozone/pom.xml                               |  15 +-
 hadoop-ozone/recon/pom.xml                         |   4 +-
 .../org/apache/hadoop/ozone/recon/ReconUtils.java  |  96 ++++
 .../recon/recovery/ReconOmMetadataManagerImpl.java |  21 +-
 .../spi/impl/ContainerDBServiceProviderImpl.java   |  28 +-
 .../spi/impl/OzoneManagerServiceProviderImpl.java  |  33 +-
 .../recon/spi/impl/ReconContainerDBProvider.java   |  32 +-
 .../ozone/recon/AbstractOMMetadataManagerTest.java |   2 +-
 .../apache/hadoop/ozone/recon/TestReconUtils.java  |  75 ++-
 .../recovery/TestReconOmMetadataManagerImpl.java   | 133 +++--
 .../impl/TestOzoneManagerServiceProviderImpl.java  |  35 +-
 .../spi/impl/TestReconContainerDBProvider.java     |  13 -
 hadoop-ozone/s3gateway/pom.xml                     |   5 +-
 .../apache/hadoop/ozone/s3/AWSV4AuthParser.java    |  10 +-
 .../hadoop/ozone/s3/OzoneClientProducer.java       |  16 +-
 .../hadoop/ozone/s3/OzoneServiceProvider.java      |  50 +-
 .../ozone/s3/exception/OS3ExceptionMapper.java     |   4 +-
 .../apache/hadoop/ozone/s3/util/OzoneS3Util.java   |  44 ++
 .../hadoop/ozone/s3/util/TestOzoneS3Util.java      | 130 +++++
 hadoop-ozone/tools/pom.xml                         |  14 +-
 .../hadoop/ozone/freon/BaseFreonGenerator.java     |   3 +-
 .../org/apache/hadoop/ozone/scm/cli/SQLCLI.java    |   4 +-
 .../services/org.apache.hadoop.fs.FileSystem       |   0
 hadoop-ozone/upgrade/pom.xml                       |   5 +-
 pom.ozone.xml => pom.xml                           |  29 +-
 442 files changed, 9651 insertions(+), 4987 deletions(-)
 create mode 100644 BUILDING.txt
 copy hadoop-ozone/dist/src/main/license/src/LICENSE.txt => LICENSE.txt (82%)
 copy hadoop-ozone/dist/src/main/license/src/NOTICE.txt => NOTICE.txt (91%)
 create mode 100644 README.txt
 copy hadoop-ozone/common/src/main/shellprofile.d/hadoop-ozone.sh => dev-support/bin/qbt (83%)
 mode change 100644 => 100755
 copy hadoop-ozone/common/src/main/shellprofile.d/hadoop-ozone.sh => dev-support/bin/smart-apply-patch (84%)
 mode change 100644 => 100755
 copy hadoop-ozone/common/src/main/shellprofile.d/hadoop-ozone.sh => dev-support/bin/test-patch (83%)
 mode change 100644 => 100755
 create mode 100755 dev-support/bin/yetus-wrapper
 copy {hadoop-hdds/framework => dev-support/byteman}/README.md (63%)
 create mode 100644 dev-support/byteman/hadooprpc.btm
 copy hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/util/BooleanBiFunction.java => hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/function/FunctionWithServiceException.java (67%)
 copy hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/{cli => function}/package-info.java (88%)
 delete mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolServerSideTranslatorPB.java
 create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/ContainerCommandRequestMessage.java
 create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringConversion.java
 delete mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ByteStringHelper.java
 rename hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/{PlacementPolicy.java => container/placement/algorithms/ContainerPlacementPolicy.java} (80%)
 copy hadoop-hdds/{client/src/main/java/org/apache/hadoop/hdds/scm/client => common/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms}/package-info.java (86%)
 copy hadoop-hdds/{container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/RatisServerConfiguration.java => common/src/main/java/org/apache/hadoop/hdds/utils/db/RocksDBConfiguration.java} (51%)
 create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChecksumByteBuffer.java
 create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/PureJavaCrc32ByteBuffer.java
 create mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/PureJavaCrc32CByteBuffer.java
 delete mode 100644 hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
 create mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/ratis/TestContainerCommandRequestMessage.java
 create mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChecksumByteBuffer.java
 create mode 100644 hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/OzoneProtocolMessageDispatcher.java
 create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/metrics/SCMContainerManagerMetrics.java
 copy hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/{ => container/metrics}/package-info.java (87%)
 rename hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/{SCMCommonPlacementPolicy.java => container/placement/algorithms/SCMCommonPolicy.java} (90%)
 delete mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
 create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
 rename hadoop-hdds/{common/src/main/java/org/apache/hadoop/ozone/protocolPB => server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol}/ScmBlockLocationProtocolServerSideTranslatorPB.java (85%)
 create mode 100644 hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
 copy hadoop-hdds/{tools/src/main/java/org/apache/hadoop/hdds/scm/cli => server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol}/package-info.java (88%)
 delete mode 100644 hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
 copy hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/{ReplicationManagerCommands.java => container/ContainerCommands.java} (73%)
 delete mode 100644 hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/pipeline/CreatePipelineSubcommand.java
 copy hadoop-hdds/tools/src/main/java/org/apache/hadoop/hdds/scm/cli/{ReplicationManagerCommands.java => pipeline/PipelineCommands.java} (73%)
 create mode 100644 hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/RepeatedOmKeyInfoCodec.java
 rename hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/{VolumeListCodec.java => UserVolumeInfoCodec.java} (78%)
 create mode 100644 hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/RepeatedOmKeyInfo.java
 copy hadoop-ozone/dist/src/main/compose/{ozone-mr/hadoop32 => ozonesecure-mr}/test.sh (83%)
 create mode 100644 hadoop-ozone/dist/src/main/smoketest/gdpr/gdpr.robot
 copy hadoop-ozone/dist/src/main/smoketest/{kinit.robot => kinit-hadoop.robot} (94%)
 copy hadoop-ozone/dist/src/main/smoketest/{s3/__init__.robot => scmcli/pipeline.robot} (74%)
 copy hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/{ScmProtocolBlockLocationInsight.java => ScmProtocolContainerLocationInsight.java} (73%)
 copy hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/{ScmProtocolBlockLocationInsight.java => ScmProtocolDatanodeInsight.java} (67%)
 copy hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/{ScmProtocolBlockLocationInsight.java => ScmProtocolSecurityInsight.java} (75%)
 create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java
 create mode 100644 hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java
 create mode 100644 hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ha/OMHANodeDetails.java
 rename hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/{ => ha}/OMNodeDetails.java (99%)
 copy hadoop-ozone/{common => ozone-manager}/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java (93%)
 create mode 100644 hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmMetadataManager.java
 copy hadoop-ozone/{integration-test/src/test/java/org/apache/hadoop/ozone/om => ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request}/package-info.java (92%)
 copy {hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node => hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response}/package-info.java (92%)
 copy hadoop-ozone/{common/src/test/java/org/apache/hadoop/ozone/om/lock => ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/volume}/package-info.java (91%)
 create mode 100644 hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/s3/util/TestOzoneS3Util.java
 copy hadoop-ozone/{ozonefs/src/test => tools/src/main}/resources/META-INF/services/org.apache.hadoop.fs.FileSystem (100%)
 rename pom.ozone.xml => pom.xml (99%)


---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-help@hadoop.apache.org


[hadoop-ozone] 02/02: Fix checkstyle and code improvement

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-2034
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit d13f9600c58e82c5e7ed434b97fbb31392a341f0
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Tue Oct 15 14:52:02 2019 +0800

    Fix checkstyle and code improvement
---
 .../org/apache/hadoop/hdds/scm/pipeline/Pipeline.java  |  2 --
 .../common/report/PipelineReportPublisher.java         |  1 -
 .../common/statemachine/DatanodeStateMachine.java      |  8 --------
 .../hdds/scm/container/ContainerStateManager.java      | 18 +++++++++---------
 .../hdds/scm/pipeline/RatisPipelineProvider.java       |  1 -
 .../scm/container/TestCloseContainerEventHandler.java  |  9 +--------
 .../hdds/scm/pipeline/TestSCMPipelineManager.java      |  1 -
 .../client/rpc/TestContainerStateMachineFailures.java  |  7 ++++++-
 8 files changed, 16 insertions(+), 31 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index f2ca2fa..4414368 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -50,7 +50,6 @@ public final class Pipeline {
   private Map<DatanodeDetails, Long> nodeStatus;
   // nodes with ordered distance to client
   private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>();
-  private final long creationTime;
 
   /**
    * The immutable properties of pipeline object is used in
@@ -65,7 +64,6 @@ public final class Pipeline {
     this.factor = factor;
     this.state = state;
     this.nodeStatus = nodeStatus;
-    this.creationTime = System.currentTimeMillis();
   }
 
   /**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
index e1dd098..e7f4347 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
@@ -68,7 +68,6 @@ public class PipelineReportPublisher extends
 
   @Override
   protected PipelineReportsProto getReport() {
-    System.out.println("Pipeline Report Generate");
     return getContext().getParent().getContainer().getPipelineReport();
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 926f19c..6832153 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -492,12 +492,4 @@ public class DatanodeStateMachine implements Closeable {
   public ReplicationSupervisor getSupervisor() {
     return supervisor;
   }
-
-  public Configuration getConf() {
-    return conf;
-  }
-
-  public CertificateClient getCertificateClient() {
-    return dnCertClient;
-  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 78a944f..cefc185 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -249,19 +249,19 @@ public class ContainerStateManager {
       throws IOException {
 
     Pipeline pipeline;
-    final List<Pipeline> pipelines = pipelineManager
-        .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
-    if (pipelines.isEmpty()) {
-      try {
-        pipeline = pipelineManager.createPipeline(type, replicationFactor);
-        pipelineManager.waitPipelineReady(pipeline.getId(), 0);
-      } catch (IOException e) {
-        LOG.error("Fail to create pipeline for " + e.getMessage());
+    try {
+      // TODO: #CLUTIL remove creation logic when all replication types and
+      // factors are handled by pipeline creator job.
+      pipeline = pipelineManager.createPipeline(type, replicationFactor);
+      pipelineManager.waitPipelineReady(pipeline.getId(), 0);
+    } catch (IOException e) {
+      final List<Pipeline> pipelines = pipelineManager
+          .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
+      if (pipelines.isEmpty()) {
         throw new IOException("Could not allocate container. Cannot get any" +
             " matching pipeline for Type:" + type +
             ", Factor:" + replicationFactor + ", State:PipelineState.OPEN");
       }
-    } else {
       pipeline = pipelines.get((int) containerCount.get() % pipelines.size());
     }
     synchronized (pipeline) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 126b0c7..8663471 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -200,7 +200,6 @@ public class RatisPipelineProvider implements PipelineProvider {
    * @throws IOException
    */
   public void close(Pipeline pipeline) {
-    LOG.debug("Destroy pipeline:{}", pipeline.getId());
     final ClosePipelineCommand closeCommand =
         new ClosePipelineCommand(pipeline.getId());
     pipeline.getNodes().stream().forEach(node -> {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index b27f8a3..dca49cd 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -68,6 +67,7 @@ public class TestCloseContainerEventHandler {
     configuration
         .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     nodeManager = new MockNodeManager(true, 10);
+    eventQueue = new EventQueue();
     pipelineManager =
         new SCMPipelineManager(configuration, nodeManager, eventQueue);
     PipelineProvider mockRatisProvider =
@@ -78,7 +78,6 @@ public class TestCloseContainerEventHandler {
     containerManager = new
         SCMContainerManager(configuration, nodeManager,
         pipelineManager, new EventQueue());
-    eventQueue = new EventQueue();
     eventQueue.addHandler(CLOSE_CONTAINER,
         new CloseContainerEventHandler(pipelineManager, containerManager));
     eventQueue.addHandler(DATANODE_COMMAND, nodeManager);
@@ -119,9 +118,6 @@ public class TestCloseContainerEventHandler {
 
   @Test
   public void testCloseContainerEventWithValidContainers() throws IOException {
-    Pipeline pipeline = pipelineManager.createPipeline(
-        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE);
-    pipelineManager.openPipeline(pipeline.getId());
     ContainerInfo container = containerManager
         .allocateContainer(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.ONE, "ozone");
@@ -139,9 +135,6 @@ public class TestCloseContainerEventHandler {
 
   @Test
   public void testCloseContainerEventWithRatis() throws IOException {
-    Pipeline pipeline = pipelineManager.createPipeline(
-        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
-    pipelineManager.openPipeline(pipeline.getId());
     GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
         .captureLogs(CloseContainerEventHandler.LOG);
     ContainerInfo container = containerManager
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 5d934eb..59bccc2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 9ac45b8..a45d258 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -68,6 +68,8 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_PIPELINE_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
     ContainerDataProto.State.UNHEALTHY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
@@ -112,6 +114,8 @@ public class TestContainerStateMachineFailures {
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
         TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
@@ -458,6 +462,7 @@ public class TestContainerStateMachineFailures {
     Assert.assertTrue(!dispatcher.getMissingContainerSet().isEmpty());
     Assert
         .assertTrue(dispatcher.getMissingContainerSet().contains(containerID));
+
     // write a new key
     key = objectStore.getVolume(volumeName).getBucket(bucketName)
         .createKey("ratis", 1024, ReplicationType.RATIS, ReplicationFactor.ONE,
@@ -481,7 +486,7 @@ public class TestContainerStateMachineFailures {
     byte[] blockCommitSequenceIdKey =
         DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
 
-    // modify the bcsid for the container in the ROCKS DB tereby inducing
+    // modify the bcsid for the container in the ROCKS DB thereby inducing
     // corruption
     db.getStore().put(blockCommitSequenceIdKey, Longs.toByteArray(0));
     db.decrementReference();


---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-help@hadoop.apache.org


[hadoop-ozone] 01/02: HDDS-2034. Async RATIS pipeline creation and destroy through datanode heartbeat commands.

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-2034
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit caefc4f5bb10babcba429d77997733c85b249187
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Tue Oct 15 14:36:14 2019 +0800

    HDDS-2034. Async RATIS pipeline creation and destroy through datanode heartbeat commands.
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |  12 +-
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |   7 ++
 .../common/src/main/resources/ozone-default.xml    |  30 +++--
 .../common/report/PipelineReportPublisher.java     |   1 +
 .../common/statemachine/DatanodeStateMachine.java  |  14 +++
 .../CloseContainerCommandHandler.java              |  11 +-
 .../ClosePipelineCommandHandler.java               | 121 ++++++++++++++++++
 .../commandhandler/CommandHandler.java             |   2 +-
 .../CreatePipelineCommandHandler.java              | 135 +++++++++++++++++++++
 .../states/endpoint/HeartbeatEndpointTask.java     |  22 ++++
 .../common/transport/server/XceiverServerSpi.java  |  18 +++
 .../transport/server/ratis/XceiverServerRatis.java |  36 ++++++
 .../protocol/commands/ClosePipelineCommand.java    |  73 +++++++++++
 .../protocol/commands/CreatePipelineCommand.java   | 100 +++++++++++++++
 .../proto/StorageContainerDatanodeProtocol.proto   |  23 ++++
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |   3 +
 .../hdds/scm/container/ContainerStateManager.java  |  17 +--
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |  12 +-
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |  13 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  15 ++-
 .../hadoop/hdds/scm/pipeline/PipelineProvider.java |   2 +
 .../hdds/scm/pipeline/PipelineReportHandler.java   |  34 +++---
 .../hdds/scm/pipeline/PipelineStateManager.java    |   2 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 132 +++++++-------------
 .../hdds/scm/pipeline/RatisPipelineUtils.java      | 103 ----------------
 .../hdds/scm/pipeline/SCMPipelineManager.java      | 102 +++++++++++++---
 .../hdds/scm/pipeline/SCMPipelineMetrics.java      |  10 ++
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |   5 +
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |  96 ++++++---------
 .../safemode/OneReplicaPipelineSafeModeRule.java   |  86 ++++++-------
 .../hdds/scm/safemode/SCMSafeModeManager.java      |  30 ++++-
 .../hadoop/hdds/scm/safemode/SafeModeHandler.java  |   5 +-
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |   1 +
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |  20 +++
 .../hdds/scm/server/StorageContainerManager.java   |   3 +-
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java |  13 ++
 .../container/TestCloseContainerEventHandler.java  |  14 ++-
 .../scm/container/TestSCMContainerManager.java     |   4 +-
 .../hdds/scm/node/TestContainerPlacement.java      |   2 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |  11 +-
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |   9 +-
 .../scm/pipeline/MockRatisPipelineProvider.java    |   3 +-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |  45 +++----
 .../TestOneReplicaPipelineSafeModeRule.java        |  35 +-----
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |  48 +++-----
 .../TestContainerStateManagerIntegration.java      |   4 +-
 .../hdds/scm/pipeline/TestPipelineClose.java       |   4 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    |  15 ++-
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  37 +++---
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  12 +-
 50 files changed, 1039 insertions(+), 513 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 99972ae..5e161b3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -81,7 +81,12 @@ public final class HddsConfigKeys {
   public static final String HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK =
       "hdds.scm.safemode.pipeline-availability.check";
   public static final boolean
-      HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = false;
+      HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = true;
+
+  public static final String HDDS_SCM_SAFEMODE_PIPELINE_CREATION =
+      "hdds.scm.safemode.pipeline.creation";
+  public static final boolean
+      HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT = true;
 
   // % of containers which should have at least one reported replica
   // before SCM comes out of safe mode.
@@ -89,13 +94,16 @@ public final class HddsConfigKeys {
       "hdds.scm.safemode.threshold.pct";
   public static final double HDDS_SCM_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.99;
 
-
   // percentage of healthy pipelines, where all 3 datanodes are reported in the
   // pipeline.
   public static final String HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT =
       "hdds.scm.safemode.healthy.pipelie.pct";
   public static final double
       HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT = 0.10;
+  // number of healthy RATIS pipeline(ONE or THREE factor)
+  public static final String HDDS_SCM_SAFEMODE_MIN_PIPELINE =
+      "hdds.scm.safemode.min.pipeline";
+  public static final int HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT = 1;
 
   public static final String HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT =
       "hdds.scm.safemode.atleast.one.node.reported.pipeline.pct";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 2828f6e..f2ca2fa 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -50,6 +50,7 @@ public final class Pipeline {
   private Map<DatanodeDetails, Long> nodeStatus;
   // nodes with ordered distance to client
   private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>();
+  private final long creationTime;
 
   /**
    * The immutable properties of pipeline object is used in
@@ -64,6 +65,7 @@ public final class Pipeline {
     this.factor = factor;
     this.state = state;
     this.nodeStatus = nodeStatus;
+    this.creationTime = System.currentTimeMillis();
   }
 
   /**
@@ -134,6 +136,11 @@ public final class Pipeline {
     return state == PipelineState.OPEN;
   }
 
+  public boolean isAllocationTimeout() {
+    //TODO: define a system property to control the timeout value
+    return false;
+  }
+
   public void setNodesInOrder(List<DatanodeDetails> nodes) {
     nodesInOrder.set(nodes);
   }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index b0a59fa..eff4f35 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -319,15 +319,6 @@
       defined with postfix (ns,ms,s,m,h,d)</description>
   </property>
   <property>
-    <name>hdds.command.status.report.interval</name>
-    <value>60000ms</value>
-    <tag>OZONE, CONTAINER, MANAGEMENT</tag>
-    <description>Time interval of the datanode to send status of command
-      execution. Each datanode periodically the execution status of commands
-      received from SCM to SCM. Unit could be defined with postfix
-      (ns,ms,s,m,h,d)</description>
-  </property>
-  <property>
     <name>hdds.pipeline.report.interval</name>
     <value>60000ms</value>
     <tag>OZONE, PIPELINE, MANAGEMENT</tag>
@@ -1314,7 +1305,7 @@
 
   <property>
     <name>hdds.scm.safemode.pipeline-availability.check</name>
-    <value>false</value>
+    <value>true</value>
     <tag>HDDS,SCM,OPERATION</tag>
     <description>
       Boolean value to enable pipeline availability check during SCM safe mode.
@@ -1400,6 +1391,25 @@
   </property>
 
   <property>
+    <name>hdds.scm.safemode.pipeline.creation</name>
+    <value>true</value>
+    <tag>HDDS,SCM,OPERATION</tag>
+    <description>
+      Boolean value to enable background pipeline creation in SCM safe mode.
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.scm.safemode.min.pipeline</name>
+    <value>1</value>
+    <tag>HDDS,SCM,OPERATION</tag>
+    <description>
+      Minimum RATIS pipeline number to exit SCM safe mode. Considered only when
+      "hdds.scm.safemode.pipeline.creation" is True.
+    </description>
+  </property>
+
+  <property>
     <name>hdds.lock.max.concurrency</name>
     <value>100</value>
     <tag>HDDS</tag>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
index e7f4347..e1dd098 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/PipelineReportPublisher.java
@@ -68,6 +68,7 @@ public class PipelineReportPublisher extends
 
   @Override
   protected PipelineReportsProto getReport() {
+    System.out.println("Pipeline Report Generate");
     return getContext().getParent().getContainer().getPipelineReport();
   }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index c9eb702..926f19c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -39,8 +39,12 @@ import org.apache.hadoop.ozone.container.common.report.ReportManager;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CloseContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .ClosePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CommandDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .CreatePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteBlocksCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteContainerCommandHandler;
@@ -126,6 +130,8 @@ public class DatanodeStateMachine implements Closeable {
             conf))
         .addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
         .addHandler(new DeleteContainerCommandHandler())
+        .addHandler(new ClosePipelineCommandHandler())
+        .addHandler(new CreatePipelineCommandHandler())
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)
@@ -486,4 +492,12 @@ public class DatanodeStateMachine implements Closeable {
   public ReplicationSupervisor getSupervisor() {
     return supervisor;
   }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public CertificateClient getCertificateClient() {
+    return dnCertClient;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 2dec08f..26031d5 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Handler for close container command received from SCM.
@@ -48,7 +49,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(CloseContainerCommandHandler.class);
 
-  private int invocationCount;
+  private AtomicLong invocationCount = new AtomicLong(0);
   private long totalTime;
 
   /**
@@ -69,7 +70,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
     LOG.debug("Processing Close Container command.");
-    invocationCount++;
+    invocationCount.incrementAndGet();
     final long startTime = Time.monotonicNow();
     final DatanodeDetails datanodeDetails = context.getParent()
         .getDatanodeDetails();
@@ -161,7 +162,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
    */
   @Override
   public int getInvocationCount() {
-    return invocationCount;
+    return (int)invocationCount.get();
   }
 
   /**
@@ -171,8 +172,8 @@ public class CloseContainerCommandHandler implements CommandHandler {
    */
   @Override
   public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
     }
     return 0;
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
new file mode 100644
index 0000000..a31387a
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.transport.server
+    .XceiverServerSpi;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Handler for close pipeline command received from SCM.
+ */
+public class ClosePipelineCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
+
+  private AtomicLong invocationCount = new AtomicLong(0);
+  private long totalTime;
+
+  /**
+   * Constructs a closePipelineCommand handler.
+   */
+  public ClosePipelineCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param ozoneContainer    - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+      StateContext context, SCMConnectionManager connectionManager) {
+    invocationCount.incrementAndGet();
+    final long startTime = Time.monotonicNow();
+    final DatanodeDetails dn = context.getParent().getDatanodeDetails();
+    final ClosePipelineCommandProto closeCommand =
+        ((ClosePipelineCommand)command).getProto();
+    final HddsProtos.PipelineID pipelineID = closeCommand.getPipelineID();
+
+    try {
+      XceiverServerSpi server = ozoneContainer.getWriteChannel();
+      server.removeGroup(pipelineID);
+      context.getParent().triggerHeartbeat();
+      LOG.info("Close Pipeline #{} command on datanode #{}.", pipelineID,
+          dn.getUuidString());
+    } catch (IOException e) {
+      LOG.error("Can't close pipeline #{}", pipelineID, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.closePipelineCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 1ea0ea8..dca02f6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -68,7 +68,7 @@ public interface CommandHandler {
   default void updateCommandStatus(StateContext context, SCMCommand command,
       Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
     if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
-      log.debug("{} with Id:{} not found.", command.getType(),
+      log.warn("{} with Id:{} not found.", command.getType(),
           command.getId());
     }
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
new file mode 100644
index 0000000..3a60d7e
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.transport.server
+    .XceiverServerSpi;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for create pipeline command received from SCM.
+ */
+public class CreatePipelineCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
+
+  private AtomicLong invocationCount = new AtomicLong(0);
+  private long totalTime;
+
+  /**
+   * Constructs a createPipelineCommand handler.
+   */
+  public CreatePipelineCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param ozoneContainer    - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+      StateContext context, SCMConnectionManager connectionManager) {
+    invocationCount.incrementAndGet();
+    final long startTime = Time.monotonicNow();
+    final DatanodeDetails dn = context.getParent()
+        .getDatanodeDetails();
+    final CreatePipelineCommandProto createCommand =
+        ((CreatePipelineCommand)command).getProto();
+    final HddsProtos.PipelineID pipelineID = createCommand.getPipelineID();
+    Collection<DatanodeDetails> peers =
+        createCommand.getDatanodeList().stream()
+            .map(DatanodeDetails::getFromProtoBuf)
+            .collect(Collectors.toList());
+
+    try {
+      XceiverServerSpi server = ozoneContainer.getWriteChannel();
+      server.addGroup(pipelineID, peers);
+      LOG.info("Create Pipeline {} {} #{} command succeed on datanode {}.",
+          createCommand.getType(), createCommand.getFactor(), pipelineID,
+          dn.getUuidString());
+      // Trigger heartbeat report
+      context.addReport(context.getParent().getContainer().getPipelineReport());
+      context.getParent().triggerHeartbeat();
+    } catch (NotLeaderException e) {
+      LOG.debug("Follower cannot create pipeline #{}.", pipelineID);
+    } catch (IOException e) {
+      LOG.error("Can't create pipeline {} {} #{}", createCommand.getType(),
+          createCommand.getFactor(), pipelineID, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.createPipelineCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index c50f457..a55d0d6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine.EndPointStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -309,6 +311,26 @@ public class HeartbeatEndpointTask
         }
         this.context.addCommand(deleteContainerCommand);
         break;
+      case createPipelineCommand:
+        CreatePipelineCommand createPipelineCommand =
+            CreatePipelineCommand.getFromProtobuf(
+                commandResponseProto.getCreatePipelineCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM create pipeline request {}",
+              createPipelineCommand.getPipelineID());
+        }
+        this.context.addCommand(createPipelineCommand);
+        break;
+      case closePipelineCommand:
+        ClosePipelineCommand closePipelineCommand =
+            ClosePipelineCommand.getFromProtobuf(
+                commandResponseProto.getClosePipelineCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM close pipeline request {}",
+              closePipelineCommand.getPipelineID());
+        }
+        this.context.addCommand(closePipelineCommand);
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCommandType().name());
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index 4e0d343..01f463c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReport;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 
 /** A server endpoint that acts as the communication layer for Ozone
@@ -60,6 +62,22 @@ public interface XceiverServerSpi {
    */
   boolean isExist(HddsProtos.PipelineID pipelineId);
 
+
+  /**
+   * Join a new pipeline.
+   */
+  default void addGroup(HddsProtos.PipelineID pipelineId,
+      Collection<DatanodeDetails> peers) throws IOException {
+  }
+
+
+  /**
+   * Exit a pipeline.
+   */
+  default void removeGroup(HddsProtos.PipelineID pipelineId)
+      throws IOException {
+  }
+
   /**
    * Get pipeline report for the XceiverServer instance.
    * @return list of report for each pipeline.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 80e91cd..9733b8e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -66,6 +66,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
@@ -617,6 +618,41 @@ public final class XceiverServerRatis extends XceiverServer {
     return pipelineIDs;
   }
 
+  @Override
+  public void addGroup(HddsProtos.PipelineID pipelineId,
+      Collection<DatanodeDetails> peers) throws IOException {
+    final PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineId);
+    final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
+    final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers);
+    GroupManagementRequest request = GroupManagementRequest.newAdd(
+        clientId, server.getId(), nextCallId(), group);
+
+    RaftClientReply reply;
+    try {
+      reply = server.groupManagement(request);
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+    processReply(reply);
+  }
+
+  @Override
+  public void removeGroup(HddsProtos.PipelineID pipelineId)
+      throws IOException {
+    GroupManagementRequest request = GroupManagementRequest.newRemove(
+        clientId, server.getId(), nextCallId(),
+        RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId()),
+        true);
+
+    RaftClientReply reply;
+    try {
+      reply = server.groupManagement(request);
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+    processReply(reply);
+  }
+
   void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
     handlePipelineFailure(groupId, roleInfoProto);
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
new file mode 100644
index 0000000..1f75bc3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+
+/**
+ * Asks datanode to close a pipeline.
+ */
+public class ClosePipelineCommand
+    extends SCMCommand<ClosePipelineCommandProto> {
+
+  private final PipelineID pipelineID;
+
+  public ClosePipelineCommand(final PipelineID pipelineID) {
+    super();
+    this.pipelineID = pipelineID;
+  }
+
+  public ClosePipelineCommand(long cmdId, final PipelineID pipelineID) {
+    super(cmdId);
+    this.pipelineID = pipelineID;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.closePipelineCommand;
+  }
+
+  @Override
+  public ClosePipelineCommandProto getProto() {
+    ClosePipelineCommandProto.Builder builder =
+        ClosePipelineCommandProto.newBuilder();
+    builder.setCmdId(getId());
+    builder.setPipelineID(pipelineID.getProtobuf());
+    return builder.build();
+  }
+
+  public static ClosePipelineCommand getFromProtobuf(
+      ClosePipelineCommandProto createPipelineProto) {
+    Preconditions.checkNotNull(createPipelineProto);
+    return new ClosePipelineCommand(createPipelineProto.getCmdId(),
+        PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()));
+  }
+
+  public PipelineID getPipelineID() {
+    return pipelineID;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
new file mode 100644
index 0000000..9e22cbc
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Asks datanode to create a pipeline.
+ */
+public class CreatePipelineCommand
+    extends SCMCommand<CreatePipelineCommandProto> {
+
+  private final PipelineID pipelineID;
+  private final ReplicationFactor factor;
+  private final ReplicationType type;
+  private final List<DatanodeDetails> nodelist;
+
+  public CreatePipelineCommand(final PipelineID pipelineID,
+      final ReplicationType type, final ReplicationFactor factor,
+      final List<DatanodeDetails> datanodeList) {
+    super();
+    this.pipelineID = pipelineID;
+    this.factor = factor;
+    this.type = type;
+    this.nodelist = datanodeList;
+  }
+
+  public CreatePipelineCommand(long cmdId, final PipelineID pipelineID,
+      final ReplicationType type, final ReplicationFactor factor,
+      final List<DatanodeDetails> datanodeList) {
+    super(cmdId);
+    this.pipelineID = pipelineID;
+    this.factor = factor;
+    this.type = type;
+    this.nodelist = datanodeList;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.createPipelineCommand;
+  }
+
+  @Override
+  public CreatePipelineCommandProto getProto() {
+    return CreatePipelineCommandProto.newBuilder()
+        .setCmdId(getId())
+        .setPipelineID(pipelineID.getProtobuf())
+        .setFactor(factor)
+        .setType(type)
+        .addAllDatanode(nodelist.stream()
+            .map(DatanodeDetails::getProtoBufMessage)
+            .collect(Collectors.toList()))
+        .build();
+  }
+
+  public static CreatePipelineCommand getFromProtobuf(
+      CreatePipelineCommandProto createPipelineProto) {
+    Preconditions.checkNotNull(createPipelineProto);
+    return new CreatePipelineCommand(createPipelineProto.getCmdId(),
+        PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()),
+        createPipelineProto.getType(), createPipelineProto.getFactor(),
+        createPipelineProto.getDatanodeList().stream()
+            .map(DatanodeDetails::getFromProtoBuf)
+            .collect(Collectors.toList()));
+  }
+
+  public PipelineID getPipelineID() {
+    return pipelineID;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index a975cd5..c8ef6c1 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -282,6 +282,8 @@ message SCMCommandProto {
     closeContainerCommand = 3;
     deleteContainerCommand = 4;
     replicateContainerCommand = 5;
+    createPipelineCommand = 6;
+    closePipelineCommand = 7;
   }
   // TODO: once we start using protoc 3.x, refactor this message using "oneof"
   required Type commandType = 1;
@@ -290,6 +292,8 @@ message SCMCommandProto {
   optional CloseContainerCommandProto closeContainerCommandProto = 4;
   optional DeleteContainerCommandProto deleteContainerCommandProto = 5;
   optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
+  optional CreatePipelineCommandProto createPipelineCommandProto = 7;
+  optional ClosePipelineCommandProto closePipelineCommandProto = 8;
 }
 
 /**
@@ -359,6 +363,25 @@ message ReplicateContainerCommandProto {
 }
 
 /**
+This command asks the datanode to create a pipeline.
+*/
+message CreatePipelineCommandProto {
+  required PipelineID pipelineID = 1;
+  required ReplicationType type = 2;
+  required ReplicationFactor factor = 3;
+  repeated DatanodeDetailsProto datanode = 4;
+  required int64 cmdId = 5;
+}
+
+/**
+This command asks the datanode to close a pipeline.
+*/
+message ClosePipelineCommandProto {
+  required PipelineID pipelineID = 1;
+  required int64 cmdId = 2;
+}
+
+/**
  * Protocol used from a datanode to StorageContainerManager.
  *
  * Please see the request and response messages for details of the RPC calls.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 4c182c3..9e815c8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import javax.management.ObjectName;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -190,6 +191,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
           // TODO: #CLUTIL Remove creation logic when all replication types and
           // factors are handled by pipeline creator
           pipeline = pipelineManager.createPipeline(type, factor);
+          // wait until pipeline is ready
+          pipelineManager.waitPipelineReady(pipeline.getId(), 0);
         } catch (IOException e) {
           LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
                   "get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 7dde8d7..78a944f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -249,18 +249,19 @@ public class ContainerStateManager {
       throws IOException {
 
     Pipeline pipeline;
-    try {
-      // TODO: #CLUTIL remove creation logic when all replication types and
-      // factors are handled by pipeline creator job.
-      pipeline = pipelineManager.createPipeline(type, replicationFactor);
-    } catch (IOException e) {
-      final List<Pipeline> pipelines = pipelineManager
-          .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
-      if (pipelines.isEmpty()) {
+    final List<Pipeline> pipelines = pipelineManager
+        .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
+    if (pipelines.isEmpty()) {
+      try {
+        pipeline = pipelineManager.createPipeline(type, replicationFactor);
+        pipelineManager.waitPipelineReady(pipeline.getId(), 0);
+      } catch (IOException e) {
+        LOG.error("Fail to create pipeline for " + e.getMessage());
         throw new IOException("Could not allocate container. Cannot get any" +
             " matching pipeline for Type:" + type +
             ", Factor:" + replicationFactor + ", State:PipelineState.OPEN");
       }
+    } else {
       pipeline = pipelines.get((int) containerCount.get() % pipelines.size());
     }
     synchronized (pipeline) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 43d396e..6de05fd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.events;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -97,15 +98,14 @@ public final class SCMEvents {
           new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report");
 
   /**
-   * PipelineReport processed by pipeline report handler. This event is
+   * Open pipeline event sent by PipelineReportHandler. This event is
    * received by HealthyPipelineSafeModeRule.
    */
-  public static final TypedEvent<PipelineReportFromDatanode>
-      PROCESSED_PIPELINE_REPORT = new TypedEvent<>(
-          PipelineReportFromDatanode.class, "Processed_Pipeline_Report");
+  public static final TypedEvent<Pipeline>
+      OPEN_PIPELINE = new TypedEvent<>(Pipeline.class, "Open_Pipeline");
 
   /**
-   * PipelineActions are sent by Datanode. This event is received by
+   * PipelineActions are sent by Datanode to close a pipeline. It's received by
    * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.
    */
   public static final TypedEvent<PipelineActionsFromDatanode>
@@ -113,7 +113,7 @@ public final class SCMEvents {
       "Pipeline_Actions");
 
   /**
-   * A Command status report will be sent by datanodes. This repoort is received
+   * A Command status report will be sent by datanodes. This report is received
    * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
    */
   public static final TypedEvent<CommandStatusReportFromDatanode>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 77e037a..86ad5ee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -39,12 +40,13 @@ public final class PipelineFactory {
   private Map<ReplicationType, PipelineProvider> providers;
 
   PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
-      Configuration conf, GrpcTlsConfig tlsConfig) {
+      Configuration conf, EventPublisher eventPublisher) {
     providers = new HashMap<>();
     providers.put(ReplicationType.STAND_ALONE,
         new SimplePipelineProvider(nodeManager));
     providers.put(ReplicationType.RATIS,
-        new RatisPipelineProvider(nodeManager, stateManager, conf, tlsConfig));
+        new RatisPipelineProvider(nodeManager, stateManager, conf,
+            eventPublisher));
   }
 
   @VisibleForTesting
@@ -63,6 +65,11 @@ public final class PipelineFactory {
     return providers.get(type).create(factor, nodes);
   }
 
+  public void close(ReplicationType type, Pipeline pipeline)
+      throws IOException {
+    providers.get(type).close(pipeline);
+  }
+
   public void shutdown() {
     providers.values().forEach(provider -> provider.shutdown());
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 9ba5f31..779008f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.ratis.grpc.GrpcTlsConfig;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -51,6 +50,9 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
       ReplicationFactor factor);
 
   List<Pipeline> getPipelines(ReplicationType type,
+      Pipeline.PipelineState state);
+
+  List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor, Pipeline.PipelineState state);
 
   List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
@@ -95,5 +97,14 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
    */
   void deactivatePipeline(PipelineID pipelineID) throws IOException;
 
-  GrpcTlsConfig getGrpcTlsConfig();
+  /**
+   * Wait a pipeline to be OPEN.
+   *
+   * @param pipelineID ID of the pipeline to wait for.
+   * @param timeout    wait timeout(millisecond), if 0, use default timeout
+   * @throws IOException in case of any Exception, such as timeout
+   */
+  default void waitPipelineReady(PipelineID pipelineID, long timeout)
+      throws IOException {
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index a0ce216..c00ff78 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -33,5 +33,7 @@ public interface PipelineProvider {
 
   Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
 
+  void close(Pipeline pipeline) throws IOException;
+
   void shutdown();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index 793f4e2..57e59b2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -26,17 +26,18 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Objects;
 
 /**
  * Handles Pipeline Reports from datanode.
@@ -52,17 +53,14 @@ public class PipelineReportHandler implements
   private final boolean pipelineAvailabilityCheck;
 
   public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager,
-      PipelineManager pipelineManager,
-      Configuration conf) {
+      PipelineManager pipelineManager, Configuration conf) {
     Preconditions.checkNotNull(pipelineManager);
-    Objects.requireNonNull(scmSafeModeManager);
     this.scmSafeModeManager = scmSafeModeManager;
     this.pipelineManager = pipelineManager;
     this.conf = conf;
     this.pipelineAvailabilityCheck = conf.getBoolean(
         HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
         HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT);
-
   }
 
   @Override
@@ -79,38 +77,40 @@ public class PipelineReportHandler implements
     }
     for (PipelineReport report : pipelineReport.getPipelineReportList()) {
       try {
-        processPipelineReport(report, dn);
+        processPipelineReport(report, dn, publisher);
       } catch (IOException e) {
         LOGGER.error("Could not process pipeline report={} from dn={} {}",
             report, dn, e);
       }
     }
-    if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
-      publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          pipelineReportFromDatanode);
-    }
-
   }
 
-  private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
-      throws IOException {
+  private void processPipelineReport(PipelineReport report, DatanodeDetails dn,
+      EventPublisher publisher) throws IOException {
     PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
     Pipeline pipeline;
     try {
       pipeline = pipelineManager.getPipeline(pipelineID);
     } catch (PipelineNotFoundException e) {
-      RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf,
-          pipelineManager.getGrpcTlsConfig());
+      final ClosePipelineCommand closeCommand =
+          new ClosePipelineCommand(pipelineID);
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(dn.getUuid(), closeCommand);
+      publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
       return;
     }
 
     if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
-      LOGGER.info("Pipeline {} reported by {}", pipeline.getId(), dn);
+      LOGGER.info("Pipeline {} {} reported by {}", pipeline.getFactor(),
+          pipeline.getId(), dn);
       pipeline.reportDatanode(dn);
       if (pipeline.isHealthy()) {
         // if all the dns have reported, pipeline can be moved to OPEN state
         pipelineManager.openPipeline(pipelineID);
       }
+      if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
+        publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
+      }
     } else {
       // In OPEN state case just report the datanode
       pipeline.reportDatanode(dn);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index 7615057..93fbbd1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -131,9 +131,9 @@ class PipelineStateManager {
       throw new IOException("Closed pipeline can not be opened");
     }
     if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
+      LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
       pipeline = pipelineStateMap
           .updatePipelineState(pipelineId, PipelineState.OPEN);
-      LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
     }
     return pipeline;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 0324a58..126b0c7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -24,37 +24,28 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.function.CheckedBiConsumer;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -69,6 +60,7 @@ public class RatisPipelineProvider implements PipelineProvider {
   private final NodeManager nodeManager;
   private final PipelineStateManager stateManager;
   private final Configuration conf;
+  private final EventPublisher eventPublisher;
 
   // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
   private final int parallelismForPool = 3;
@@ -83,15 +75,14 @@ public class RatisPipelineProvider implements PipelineProvider {
 
   private final ForkJoinPool forkJoinPool = new ForkJoinPool(
       parallelismForPool, factory, null, false);
-  private final GrpcTlsConfig tlsConfig;
 
   RatisPipelineProvider(NodeManager nodeManager,
       PipelineStateManager stateManager, Configuration conf,
-      GrpcTlsConfig tlsConfig) {
+      EventPublisher eventPublisher) {
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
     this.conf = conf;
-    this.tlsConfig = tlsConfig;
+    this.eventPublisher = eventPublisher;
   }
 
 
@@ -155,12 +146,25 @@ public class RatisPipelineProvider implements PipelineProvider {
 
     Pipeline pipeline = Pipeline.newBuilder()
         .setId(PipelineID.randomId())
-        .setState(PipelineState.OPEN)
+        .setState(PipelineState.ALLOCATED)
         .setType(ReplicationType.RATIS)
         .setFactor(factor)
         .setNodes(dns)
         .build();
-    initializePipeline(pipeline);
+
+    // Send command to datanodes to create pipeline
+    final CreatePipelineCommand createCommand =
+        new CreatePipelineCommand(pipeline.getId(), pipeline.getType(),
+            factor, dns);
+
+    dns.stream().forEach(node -> {
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(node.getUuid(), createCommand);
+      LOG.info("Send pipeline:{} create command to datanode {}",
+          pipeline.getId(), datanodeCommand.getDatanodeId());
+      eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+    });
+
     return pipeline;
   }
 
@@ -188,69 +192,23 @@ public class RatisPipelineProvider implements PipelineProvider {
     }
   }
 
-  protected void initializePipeline(Pipeline pipeline) throws IOException {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
-    }
-    callRatisRpc(pipeline.getNodes(),
-        (raftClient, peer) -> {
-          RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
-          if (reply == null || !reply.isSuccess()) {
-            String msg = "Pipeline initialization failed for pipeline:"
-                + pipeline.getId() + " node:" + peer.getId();
-            LOG.error(msg);
-            throw new IOException(msg);
-          }
-        });
-  }
-
-  private void callRatisRpc(List<DatanodeDetails> datanodes,
-      CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc)
-      throws IOException {
-    if (datanodes.isEmpty()) {
-      return;
-    }
-
-    final String rpcType = conf
-        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
-    final List< IOException > exceptions =
-        Collections.synchronizedList(new ArrayList<>());
-    final int maxOutstandingRequests =
-        HddsClientUtils.getMaxOutstandingRequests(conf);
-    final TimeDuration requestTimeout =
-        RatisHelper.getClientRequestTimeout(conf);
-    try {
-      forkJoinPool.submit(() -> {
-        datanodes.parallelStream().forEach(d -> {
-          final RaftPeer p = RatisHelper.toRaftPeer(d);
-          try (RaftClient client = RatisHelper
-              .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-                  retryPolicy, maxOutstandingRequests, tlsConfig,
-                  requestTimeout)) {
-            rpc.accept(client, p);
-          } catch (IOException ioe) {
-            String errMsg =
-                "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
-            LOG.error(errMsg, ioe);
-            exceptions.add(new IOException(errMsg, ioe));
-          }
-        });
-      }).get();
-    } catch (ExecutionException | RejectedExecutionException ex) {
-      LOG.error(ex.getClass().getName() + " exception occurred during " +
-          "createPipeline", ex);
-      throw new IOException(ex.getClass().getName() + " exception occurred " +
-          "during createPipeline", ex);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupt exception occurred during " +
-          "createPipeline", ex);
-    }
-    if (!exceptions.isEmpty()) {
-      throw MultipleIOException.createIOException(exceptions);
-    }
+  /**
+   * Removes pipeline from SCM. Sends command to destroy pipeline on all
+   * the datanodes.
+   *
+   * @param pipeline        - Pipeline to be destroyed
+   * @throws IOException
+   */
+  public void close(Pipeline pipeline) {
+    LOG.debug("Destroy pipeline:{}", pipeline.getId());
+    final ClosePipelineCommand closeCommand =
+        new ClosePipelineCommand(pipeline.getId());
+    pipeline.getNodes().stream().forEach(node -> {
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(node.getUuid(), closeCommand);
+      LOG.info("Send pipeline:{} close command to datanode {}",
+          pipeline.getId(), datanodeCommand.getDatanodeId());
+      eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+    });
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
deleted file mode 100644
index 20fa092..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility class for Ratis pipelines. Contains methods to create and destroy
- * ratis pipelines.
- */
-public final class RatisPipelineUtils {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RatisPipelineUtils.class);
-
-  private RatisPipelineUtils() {
-  }
-  /**
-   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
-   * the datanodes.
-   *
-   * @param pipeline        - Pipeline to be destroyed
-   * @param ozoneConf       - Ozone configuration
-   * @param grpcTlsConfig
-   * @throws IOException
-   */
-  static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
-      GrpcTlsConfig grpcTlsConfig) {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
-    }
-    for (DatanodeDetails dn : pipeline.getNodes()) {
-      try {
-        destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
-      } catch (IOException e) {
-        LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
-            pipeline.getId(), dn);
-      }
-    }
-  }
-
-  /**
-   * Sends ratis command to destroy pipeline on the given datanode.
-   *
-   * @param dn         - Datanode on which pipeline needs to be destroyed
-   * @param pipelineID - ID of pipeline to be destroyed
-   * @param ozoneConf  - Ozone configuration
-   * @param grpcTlsConfig - grpc tls configuration
-   * @throws IOException
-   */
-  static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
-      Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) throws IOException {
-    final String rpcType = ozoneConf
-        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
-    final RaftPeer p = RatisHelper.toRaftPeer(dn);
-    final int maxOutstandingRequests =
-        HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
-    final TimeDuration requestTimeout =
-        RatisHelper.getClientRequestTimeout(ozoneConf);
-    try(RaftClient client = RatisHelper
-        .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-            retryPolicy, maxOutstandingRequests, grpcTlsConfig,
-            requestTimeout)) {
-      client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
-          true, p.getId());
-    }
-  }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d..c77b88b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -37,7 +38,7 @@ import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
 import org.apache.hadoop.hdds.utils.MetadataStore;
 import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
 import org.apache.hadoop.hdds.utils.Scheduler;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +46,7 @@ import javax.management.ObjectName;
 import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -81,18 +83,21 @@ public class SCMPipelineManager implements PipelineManager {
   private final NodeManager nodeManager;
   private final SCMPipelineMetrics metrics;
   private final Configuration conf;
+  private boolean pipelineAvailabilityCheck;
+  private boolean createPipelineInSafemode;
+  private Set<PipelineID> oldRatisThreeFactorPipelineIDSet = new HashSet<>();
+  private long pipelineWaitDefaultTimeout;
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
-  private GrpcTlsConfig grpcTlsConfig;
 
   public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
-      EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig)
+      EventPublisher eventPublisher)
       throws IOException {
     this.lock = new ReentrantReadWriteLock();
     this.conf = conf;
     this.stateManager = new PipelineStateManager(conf);
     this.pipelineFactory = new PipelineFactory(nodeManager, stateManager,
-        conf, grpcTlsConfig);
+        conf, eventPublisher);
     // TODO: See if thread priority needs to be set for these threads
     scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
     this.backgroundPipelineCreator =
@@ -113,8 +118,17 @@ public class SCMPipelineManager implements PipelineManager {
     this.metrics = SCMPipelineMetrics.create();
     this.pmInfoBean = MBeans.register("SCMPipelineManager",
         "SCMPipelineManagerInfo", this);
+    this.pipelineAvailabilityCheck = conf.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT);
+    this.createPipelineInSafemode = conf.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+    this.pipelineWaitDefaultTimeout = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
+        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
     initializePipelineState();
-    this.grpcTlsConfig = grpcTlsConfig;
   }
 
   public PipelineStateManager getStateManager() {
@@ -127,9 +141,16 @@ public class SCMPipelineManager implements PipelineManager {
     pipelineFactory.setProvider(replicationType, provider);
   }
 
+  public Set<PipelineID> getOldPipelineIdSet() {
+    return oldRatisThreeFactorPipelineIDSet;
+  }
+
   private void initializePipelineState() throws IOException {
     if (pipelineStore.isEmpty()) {
       LOG.info("No pipeline exists in current db");
+      if (pipelineAvailabilityCheck && createPipelineInSafemode) {
+        startPipelineCreator();
+      }
       return;
     }
     List<Map.Entry<byte[], byte[]>> pipelines =
@@ -144,12 +165,16 @@ public class SCMPipelineManager implements PipelineManager {
       Preconditions.checkNotNull(pipeline);
       stateManager.addPipeline(pipeline);
       nodeManager.addPipeline(pipeline);
+      if (pipeline.getType() == ReplicationType.RATIS &&
+          pipeline.getFactor() == ReplicationFactor.THREE) {
+        oldRatisThreeFactorPipelineIDSet.add(pipeline.getId());
+      }
     }
   }
 
   @Override
-  public synchronized Pipeline createPipeline(
-      ReplicationType type, ReplicationFactor factor) throws IOException {
+  public synchronized Pipeline createPipeline(ReplicationType type,
+      ReplicationFactor factor) throws IOException {
     lock.writeLock().lock();
     try {
       Pipeline pipeline = pipelineFactory.create(type, factor);
@@ -157,8 +182,11 @@ public class SCMPipelineManager implements PipelineManager {
           pipeline.getProtobufMessage().toByteArray());
       stateManager.addPipeline(pipeline);
       nodeManager.addPipeline(pipeline);
-      metrics.incNumPipelineCreated();
-      metrics.createPerPipelineMetrics(pipeline);
+      metrics.incNumPipelineAllocated();
+      if (pipeline.isOpen()) {
+        metrics.incNumPipelineCreated();
+        metrics.createPerPipelineMetrics(pipeline);
+      }
       return pipeline;
     } catch (InsufficientDatanodesException idEx) {
       throw idEx;
@@ -225,6 +253,16 @@ public class SCMPipelineManager implements PipelineManager {
     }
   }
 
+  public List<Pipeline> getPipelines(ReplicationType type,
+      Pipeline.PipelineState state) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelines(type, state);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
   @Override
   public List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor, Pipeline.PipelineState state) {
@@ -293,6 +331,7 @@ public class SCMPipelineManager implements PipelineManager {
     lock.writeLock().lock();
     try {
       Pipeline pipeline = stateManager.openPipeline(pipelineId);
+      metrics.incNumPipelineCreated();
       metrics.createPerPipelineMetrics(pipeline);
     } finally {
       lock.writeLock().unlock();
@@ -380,6 +419,44 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   /**
+   * Wait a pipeline to be OPEN.
+   *
+   * @param pipelineID ID of the pipeline to wait for.
+   * @param timeout    wait timeout, millisecond
+   * @throws IOException in case of any Exception, such as timeout
+   */
+  @Override
+  public void waitPipelineReady(PipelineID pipelineID, long timeout)
+      throws IOException {
+    Pipeline pipeline;
+    try {
+      pipeline = stateManager.getPipeline(pipelineID);
+    } catch (PipelineNotFoundException e) {
+      throw new IOException(String.format("Pipeline %s cannot be found",
+          pipelineID));
+    }
+
+    boolean ready;
+    long st = Time.monotonicNow();
+    if (timeout == 0) {
+      timeout = pipelineWaitDefaultTimeout;
+    }
+    for(ready = pipeline.isOpen();
+        !ready && Time.monotonicNow() - st < timeout;
+        ready = pipeline.isOpen()) {
+      try {
+        Thread.sleep((long)1000);
+      } catch (InterruptedException e) {
+
+      }
+    }
+    if (!ready) {
+      throw new IOException(String.format("Pipeline %s is not ready in %d ms",
+          pipelineID, timeout));
+    }
+  }
+
+  /**
    * Moves the pipeline to CLOSED state and sends close container command for
    * all the containers in the pipeline.
    *
@@ -408,7 +485,7 @@ public class SCMPipelineManager implements PipelineManager {
    * @throws IOException
    */
   private void destroyPipeline(Pipeline pipeline) throws IOException {
-    RatisPipelineUtils.destroyPipeline(pipeline, conf, grpcTlsConfig);
+    pipelineFactory.close(pipeline.getType(), pipeline);
     // remove the pipeline from the pipeline manager
     removePipeline(pipeline.getId());
     triggerPipelineCreation();
@@ -441,11 +518,6 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
-  public GrpcTlsConfig getGrpcTlsConfig() {
-    return grpcTlsConfig;
-  }
-
-  @Override
   public void close() throws IOException {
     if (scheduler != null) {
       scheduler.close();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index d0f7f6e..cd80010 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -46,6 +46,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
 
   private MetricsRegistry registry;
 
+  private @Metric MutableCounterLong numPipelineAllocated;
   private @Metric MutableCounterLong numPipelineCreated;
   private @Metric MutableCounterLong numPipelineCreationFailed;
   private @Metric MutableCounterLong numPipelineDestroyed;
@@ -83,6 +84,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
   @SuppressWarnings("SuspiciousMethodCalls")
   public void getMetrics(MetricsCollector collector, boolean all) {
     MetricsRecordBuilder recordBuilder = collector.addRecord(SOURCE_NAME);
+    numPipelineAllocated.snapshot(recordBuilder, true);
     numPipelineCreated.snapshot(recordBuilder, true);
     numPipelineCreationFailed.snapshot(recordBuilder, true);
     numPipelineDestroyed.snapshot(recordBuilder, true);
@@ -117,6 +119,14 @@ public final class SCMPipelineMetrics implements MetricsSource {
   }
 
   /**
+   * Increments number of pipeline allocation count, including succeeded
+   * and failed.
+   */
+  void incNumPipelineAllocated() {
+    numPipelineAllocated.incr();
+  }
+
+  /**
    * Increments number of successful pipeline creation count.
    */
   void incNumPipelineCreated() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index ab98dfa..00cb7ae 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -74,6 +74,11 @@ public class SimplePipelineProvider implements PipelineProvider {
   }
 
   @Override
+  public void close(Pipeline pipeline) throws IOException {
+
+  }
+
+  @Override
   public void shutdown() {
     // Do nothing.
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 7a00d76..28f1200 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -20,17 +20,10 @@ package org.apache.hadoop.hdds.scm.safemode;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
-
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -38,9 +31,6 @@ import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.Set;
-
 /**
  * Class defining Safe mode exit criteria for Pipelines.
  *
@@ -49,43 +39,52 @@ import java.util.Set;
  * through in a cluster.
  */
 public class HealthyPipelineSafeModeRule
-    extends SafeModeExitRule<PipelineReportFromDatanode>{
+    extends SafeModeExitRule<Pipeline>{
 
   public static final Logger LOG =
       LoggerFactory.getLogger(HealthyPipelineSafeModeRule.class);
-  private final PipelineManager pipelineManager;
   private final int healthyPipelineThresholdCount;
   private int currentHealthyPipelineCount = 0;
-  private final Set<DatanodeDetails> processedDatanodeDetails =
-      new HashSet<>();
 
   HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
       PipelineManager pipelineManager,
       SCMSafeModeManager manager, Configuration configuration) {
     super(manager, ruleName, eventQueue);
-    this.pipelineManager = pipelineManager;
     double healthyPipelinesPercent =
         configuration.getDouble(HddsConfigKeys.
                 HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
             HddsConfigKeys.
                 HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);
 
+    int minHealthyPipelines = 0;
+
+    boolean createPipelineInSafemode = configuration.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+
+    if (createPipelineInSafemode) {
+      minHealthyPipelines =
+          configuration.getInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE,
+              HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT);
+    }
+
     Preconditions.checkArgument(
         (healthyPipelinesPercent >= 0.0 && healthyPipelinesPercent <= 1.0),
         HddsConfigKeys.
             HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT
             + " value should be >= 0.0 and <= 1.0");
 
-    // As we want to wait for 3 node pipelines
-    int pipelineCount =
+    // As we want to wait for RATIS write pipelines, no matter ONE or THREE
+    int pipelineCount = pipelineManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS, Pipeline.PipelineState.OPEN).size() +
         pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE).size();
+            Pipeline.PipelineState.ALLOCATED).size();
 
     // This value will be zero when pipeline count is 0.
     // On a fresh installed cluster, there will be zero pipelines in the SCM
     // pipeline DB.
-    healthyPipelineThresholdCount =
-        (int) Math.ceil(healthyPipelinesPercent * pipelineCount);
+    healthyPipelineThresholdCount = Math.max(minHealthyPipelines,
+        (int) Math.ceil(healthyPipelinesPercent * pipelineCount));
 
     LOG.info(" Total pipeline count is {}, healthy pipeline " +
         "threshold count is {}", pipelineCount, healthyPipelineThresholdCount);
@@ -95,8 +94,8 @@ public class HealthyPipelineSafeModeRule
   }
 
   @Override
-  protected TypedEvent<PipelineReportFromDatanode> getEventType() {
-    return SCMEvents.PROCESSED_PIPELINE_REPORT;
+  protected TypedEvent<Pipeline> getEventType() {
+    return SCMEvents.OPEN_PIPELINE;
   }
 
   @Override
@@ -108,54 +107,27 @@ public class HealthyPipelineSafeModeRule
   }
 
   @Override
-  protected void process(PipelineReportFromDatanode
-      pipelineReportFromDatanode) {
+  protected void process(Pipeline pipeline) {
 
     // When SCM is in safe mode for long time, already registered
-    // datanode can send pipeline report again, then pipeline handler fires
-    // processed report event, we should not consider this pipeline report
-    // from datanode again during threshold calculation.
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-    DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails();
-    if (!processedDatanodeDetails.contains(
-        pipelineReportFromDatanode.getDatanodeDetails())) {
-
-      Pipeline pipeline;
-      PipelineReportsProto pipelineReport =
-          pipelineReportFromDatanode.getReport();
-
-      for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-        PipelineID pipelineID = PipelineID
-            .getFromProtobuf(report.getPipelineID());
-        try {
-          pipeline = pipelineManager.getPipeline(pipelineID);
-        } catch (PipelineNotFoundException e) {
-          continue;
-        }
-
-        if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
-            pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
-          // If the pipeline is open state mean, all 3 datanodes are reported
-          // for this pipeline.
-          currentHealthyPipelineCount++;
-          getSafeModeMetrics().incCurrentHealthyPipelinesCount();
-        }
-      }
-      if (scmInSafeMode()) {
-        SCMSafeModeManager.getLogger().info(
-            "SCM in safe mode. Healthy pipelines reported count is {}, " +
-                "required healthy pipeline reported count is {}",
-            currentHealthyPipelineCount, healthyPipelineThresholdCount);
-      }
-
-      processedDatanodeDetails.add(dnDetails);
+    // datanode can send pipeline report again, or SCMPipelineManager will
+    // create new pipelines.
+    Preconditions.checkNotNull(pipeline);
+    if (pipeline.getType() == HddsProtos.ReplicationType.RATIS) {
+      getSafeModeMetrics().incCurrentHealthyPipelinesCount();
+      currentHealthyPipelineCount++;
     }
 
+    if (scmInSafeMode()) {
+      SCMSafeModeManager.getLogger().info(
+          "SCM in safe mode. Healthy pipelines reported count is {}, " +
+              "required healthy pipeline reported count is {}",
+          currentHealthyPipelineCount, healthyPipelineThresholdCount);
+    }
   }
 
   @Override
   protected void cleanup() {
-    processedDatanodeDetails.clear();
   }
 
   @VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
index 841d8ff..abf012d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
@@ -22,17 +22,11 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.
-    PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
@@ -47,22 +41,24 @@ import java.util.Set;
  * replica available for read when we exit safe mode.
  */
 public class OneReplicaPipelineSafeModeRule extends
-    SafeModeExitRule<PipelineReportFromDatanode> {
+    SafeModeExitRule<Pipeline> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(OneReplicaPipelineSafeModeRule.class);
 
   private int thresholdCount;
   private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
-  private final PipelineManager pipelineManager;
-  private int currentReportedPipelineCount = 0;
+  private Set<PipelineID> oldPipelineIDSet;
+  private int oldPipelineReportedCount = 0;
+  private int oldPipelineThresholdCount = 0;
+  private int newPipelineThresholdCount = 0;
+  private int newPipelineReportedCount = 0;
 
 
   public OneReplicaPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
       PipelineManager pipelineManager,
       SCMSafeModeManager safeModeManager, Configuration configuration) {
     super(safeModeManager, ruleName, eventQueue);
-    this.pipelineManager = pipelineManager;
 
     double percent =
         configuration.getDouble(
@@ -75,69 +71,68 @@ public class OneReplicaPipelineSafeModeRule extends
             HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT  +
             " value should be >= 0.0 and <= 1.0");
 
+    oldPipelineIDSet =
+        ((SCMPipelineManager)pipelineManager).getOldPipelineIdSet();
     int totalPipelineCount =
         pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE).size();
+        HddsProtos.ReplicationFactor.THREE).size();
+    Preconditions.checkState(totalPipelineCount >= oldPipelineIDSet.size());
 
-    thresholdCount = (int) Math.ceil(percent * totalPipelineCount);
+    oldPipelineThresholdCount =
+        (int) Math.ceil(percent * oldPipelineIDSet.size());
+    newPipelineThresholdCount = (int) Math.ceil(
+        percent * (totalPipelineCount - oldPipelineIDSet.size()));
 
-    LOG.info(" Total pipeline count is {}, pipeline's with atleast one " +
+    thresholdCount = oldPipelineThresholdCount + newPipelineThresholdCount;
+
+    LOG.info("Total pipeline count is {}, pipeline's with at least one " +
         "datanode reported threshold count is {}", totalPipelineCount,
         thresholdCount);
 
     getSafeModeMetrics().setNumPipelinesWithAtleastOneReplicaReportedThreshold(
         thresholdCount);
-
   }
 
   @Override
-  protected TypedEvent<PipelineReportFromDatanode> getEventType() {
-    return SCMEvents.PROCESSED_PIPELINE_REPORT;
+  protected TypedEvent<Pipeline> getEventType() {
+    return SCMEvents.OPEN_PIPELINE;
   }
 
   @Override
   protected boolean validate() {
-    if (currentReportedPipelineCount >= thresholdCount) {
+    if (newPipelineReportedCount + oldPipelineReportedCount >= thresholdCount) {
       return true;
     }
     return false;
   }
 
   @Override
-  protected void process(PipelineReportFromDatanode
-      pipelineReportFromDatanode) {
-    Pipeline pipeline;
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-    PipelineReportsProto pipelineReport =
-        pipelineReportFromDatanode.getReport();
-
-    for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-      PipelineID pipelineID = PipelineID
-          .getFromProtobuf(report.getPipelineID());
-      try {
-        pipeline = pipelineManager.getPipeline(pipelineID);
-      } catch (PipelineNotFoundException e) {
-        continue;
-      }
-
-      if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
-          !reportedPipelineIDSet.contains(pipelineID)) {
-        reportedPipelineIDSet.add(pipelineID);
+  protected void process(Pipeline pipeline) {
+    Preconditions.checkNotNull(pipeline);
+    if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+        pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+        !reportedPipelineIDSet.contains(pipeline.getId())) {
+      if (oldPipelineIDSet.contains(pipeline.getId()) &&
+          oldPipelineReportedCount < oldPipelineThresholdCount) {
         getSafeModeMetrics()
             .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
+        oldPipelineReportedCount++;
+        reportedPipelineIDSet.add(pipeline.getId());
+      } else if (newPipelineReportedCount < newPipelineThresholdCount) {
+        getSafeModeMetrics()
+            .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
+        newPipelineReportedCount++;
+        reportedPipelineIDSet.add(pipeline.getId());
       }
     }
 
-    currentReportedPipelineCount = reportedPipelineIDSet.size();
-
     if (scmInSafeMode()) {
       SCMSafeModeManager.getLogger().info(
-          "SCM in safe mode. Pipelines with atleast one datanode reported " +
-              "count is {}, required atleast one datanode reported per " +
+          "SCM in safe mode. Pipelines with at least one datanode reported " +
+              "count is {}, required at least one datanode reported per " +
               "pipeline count is {}",
-          currentReportedPipelineCount, thresholdCount);
+          newPipelineReportedCount + oldPipelineReportedCount, thresholdCount);
     }
-
   }
 
   @Override
@@ -152,7 +147,6 @@ public class OneReplicaPipelineSafeModeRule extends
 
   @VisibleForTesting
   public int getCurrentReportedPipelineCount() {
-    return currentReportedPipelineCount;
+    return newPipelineReportedCount + oldPipelineReportedCount;
   }
-
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index a22d162..161963e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -59,17 +60,17 @@ import org.slf4j.LoggerFactory;
  * number of datanode registered is met or not.
  *
  * 3. HealthyPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
- * {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * Once the SCMPipelineManager processes the
+ * {@link SCMEvents#CREATE_PIPELINE_STATUS}, it fires
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
  * event. This rule processes this report, and check if pipeline is healthy
  * and increments current healthy pipeline count. Then validate it cutoff
  * threshold for healthy pipeline is met or not.
  *
  * 4. OneReplicaPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
- * {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * Once the SCMPipelineManager processes the
+ * {@link SCMEvents#CREATE_PIPELINE_STATUS}, it fires
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
  * event. This rule processes this report, and add the reported pipeline to
  * reported pipeline set. Then validate it cutoff threshold for one replica
  * per pipeline is met or not.
@@ -97,6 +98,7 @@ public class SCMSafeModeManager {
   private final PipelineManager pipelineManager;
 
   private final SafeModeMetrics safeModeMetrics;
+  private boolean createPipelineInSafeMode = false;
 
   public SCMSafeModeManager(Configuration conf,
       List<ContainerInfo> allContainers, PipelineManager pipelineManager,
@@ -134,6 +136,9 @@ public class SCMSafeModeManager {
         exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE,
             oneReplicaPipelineSafeModeRule);
       }
+      createPipelineInSafeMode = conf.getBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
       emitSafeModeStatus();
     } else {
       this.safeModeMetrics = null;
@@ -166,6 +171,7 @@ public class SCMSafeModeManager {
 
     if (exitRules.get(ruleName) != null) {
       validatedRules.add(ruleName);
+      LOG.info("{} rule is successfully validated", ruleName);
     } else {
       // This should never happen
       LOG.error("No Such Exit rule {}", ruleName);
@@ -190,6 +196,18 @@ public class SCMSafeModeManager {
    */
   @VisibleForTesting
   public void exitSafeMode(EventPublisher eventQueue) {
+    // Wait a while for as many as new pipelines to be ready
+    if (createPipelineInSafeMode) {
+      long sleepTime = config.getTimeDuration(
+          HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+          HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+          TimeUnit.MILLISECONDS);
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException e) {
+      }
+    }
+
     LOG.info("SCM exiting safe mode.");
     setInSafeMode(false);
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
index b9e5333..6128266 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
@@ -129,7 +129,8 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> {
     List<Pipeline> pipelineList = scmPipelineManager.getPipelines();
     pipelineList.forEach((pipeline) -> {
       try {
-        if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
+        if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
+            pipeline.isAllocationTimeout()) {
           scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false);
         }
       } catch (IOException ex) {
@@ -142,6 +143,4 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> {
   public boolean getSafeModeStatus() {
     return isInSafeMode.get();
   }
-
-
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 9f6077b..3dbb4cb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -164,6 +164,7 @@ public final class SCMDatanodeHeartbeatDispatcher {
       }
 
       if (heartbeat.getCommandStatusReportsCount() != 0) {
+        LOG.debug("Dispatching Command Status Report.");
         for (CommandStatusReportsProto commandStatusReport : heartbeat
             .getCommandStatusReportsList()) {
           eventPublisher.fireEvent(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 530c0a6..901bc2c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -61,6 +61,8 @@ import org.apache.hadoop.ozone.audit.Auditor;
 import org.apache.hadoop.ozone.audit.SCMAction;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -79,6 +81,12 @@ import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+    .createPipelineCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+    .closePipelineCommand;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
@@ -329,6 +337,18 @@ public class SCMDatanodeProtocolServer implements
           .setReplicateContainerCommandProto(
               ((ReplicateContainerCommand)cmd).getProto())
           .build();
+    case createPipelineCommand:
+      return builder
+          .setCommandType(createPipelineCommand)
+          .setCreatePipelineCommandProto(
+              ((CreatePipelineCommand)cmd).getProto())
+          .build();
+    case closePipelineCommand:
+      return builder
+          .setCommandType(closePipelineCommand)
+          .setClosePipelineCommandProto(
+              ((ClosePipelineCommand)cmd).getProto())
+          .build();
     default:
       throw new IllegalArgumentException("Scm command " +
           cmd.getType().toString() + " is not implemented");
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index af65e13..c45c15d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -402,8 +402,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       pipelineManager = configurator.getPipelineManager();
     } else {
       pipelineManager =
-          new SCMPipelineManager(conf, scmNodeManager, eventQueue,
-              grpcTlsConfig);
+          new SCMPipelineManager(conf, scmNodeManager, eventQueue);
     }
 
     if (configurator.getContainerManager() != null) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 37321d7..8d71fe5 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
@@ -360,6 +362,17 @@ public final class TestUtils {
     return new PipelineReportFromDatanode(dn, reportBuilder.build());
   }
 
+  public static void openAllRatisPipelines(PipelineManager pipelineManager)
+      throws IOException {
+    // Pipeline is created by background thread
+    List<Pipeline> pipelines =
+        pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS);
+    // Trigger the processed pipeline report event
+    for (Pipeline pipeline : pipelines) {
+      pipelineManager.openPipeline(pipeline.getId());
+    }
+  }
+
   public static PipelineActionsFromDatanode getPipelineActionFromDatanode(
       DatanodeDetails dn, PipelineID... pipelineIDs) {
     PipelineActionsProto.Builder actionsProtoBuilder =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index a8364a4..b27f8a3 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -24,7 +24,9 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -67,7 +69,7 @@ public class TestCloseContainerEventHandler {
         .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     nodeManager = new MockNodeManager(true, 10);
     pipelineManager =
-        new SCMPipelineManager(configuration, nodeManager, eventQueue, null);
+        new SCMPipelineManager(configuration, nodeManager, eventQueue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), configuration);
@@ -80,6 +82,8 @@ public class TestCloseContainerEventHandler {
     eventQueue.addHandler(CLOSE_CONTAINER,
         new CloseContainerEventHandler(pipelineManager, containerManager));
     eventQueue.addHandler(DATANODE_COMMAND, nodeManager);
+    // Move all pipelines created by background from ALLOCATED to OPEN state
+    TestUtils.openAllRatisPipelines(pipelineManager);
   }
 
   @AfterClass
@@ -115,7 +119,9 @@ public class TestCloseContainerEventHandler {
 
   @Test
   public void testCloseContainerEventWithValidContainers() throws IOException {
-
+    Pipeline pipeline = pipelineManager.createPipeline(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE);
+    pipelineManager.openPipeline(pipeline.getId());
     ContainerInfo container = containerManager
         .allocateContainer(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.ONE, "ozone");
@@ -133,7 +139,9 @@ public class TestCloseContainerEventHandler {
 
   @Test
   public void testCloseContainerEventWithRatis() throws IOException {
-
+    Pipeline pipeline = pipelineManager.createPipeline(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+    pipelineManager.openPipeline(pipeline.getId());
     GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
         .captureLogs(CloseContainerEventHandler.LOG);
     ContainerInfo container = containerManager
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index 75a1ad3..c203173 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -94,7 +94,7 @@ public class TestSCMContainerManager {
     }
     nodeManager = new MockNodeManager(true, 10);
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     containerManager = new SCMContainerManager(conf, nodeManager,
         pipelineManager, new EventQueue());
     xceiverClientManager = new XceiverClientManager(conf);
@@ -147,7 +147,7 @@ public class TestSCMContainerManager {
           containerInfo.getPipelineID()).getFirstNode()
           .getUuid());
     }
-    Assert.assertTrue(pipelineList.size() > 5);
+    Assert.assertTrue(pipelineList.size() >= 1);
   }
 
   @Test
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 26ffd8d..43c174e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -107,7 +107,7 @@ public class TestContainerPlacement {
     final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
         OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
     PipelineManager pipelineManager =
-        new SCMPipelineManager(config, scmNodeManager, eventQueue, null);
+        new SCMPipelineManager(config, scmNodeManager, eventQueue);
     return new SCMContainerManager(config, scmNodeManager, pipelineManager,
         eventQueue);
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 7657b54..14c24e0 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -73,6 +73,7 @@ public class TestDeadNodeHandler {
   private SCMNodeManager nodeManager;
   private ContainerManager containerManager;
   private NodeReportHandler nodeReportHandler;
+  private SCMPipelineManager pipelineManager;
   private DeadNodeHandler deadNodeHandler;
   private EventPublisher publisher;
   private EventQueue eventQueue;
@@ -87,12 +88,12 @@ public class TestDeadNodeHandler {
     eventQueue = new EventQueue();
     scm = HddsTestUtils.getScm(conf);
     nodeManager = (SCMNodeManager) scm.getScmNodeManager();
-    SCMPipelineManager manager =
+    pipelineManager =
         (SCMPipelineManager)scm.getPipelineManager();
     PipelineProvider mockRatisProvider =
-        new MockRatisPipelineProvider(nodeManager, manager.getStateManager(),
-            conf);
-    manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        new MockRatisPipelineProvider(nodeManager,
+            pipelineManager.getStateManager(), conf);
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
     containerManager = scm.getContainerManager();
     deadNodeHandler = new DeadNodeHandler(nodeManager,
@@ -147,6 +148,8 @@ public class TestDeadNodeHandler {
     nodeManager.register(TestUtils.randomDatanodeDetails(),
         TestUtils.createNodeReport(storageOne), null);
 
+    TestUtils.openAllRatisPipelines(pipelineManager);
+
     ContainerInfo container1 =
         TestUtils.allocateContainer(containerManager);
     ContainerInfo container2 =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index db76d66..f4eb797 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -121,6 +121,7 @@ public class TestSCMNodeManager {
         testDir.getAbsolutePath());
     conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
         TimeUnit.MILLISECONDS);
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     return conf;
   }
 
@@ -1035,9 +1036,11 @@ public class TestSCMNodeManager {
       eq.processAll(1000L);
       List<SCMCommand> command =
           nodemanager.processHeartbeat(datanodeDetails);
-      Assert.assertEquals(1, command.size());
-      Assert
-          .assertEquals(command.get(0).getClass(), CloseContainerCommand.class);
+      // With dh registered, SCM will send create pipeline command to dn
+      Assert.assertTrue(command.size() >= 1);
+      Assert.assertTrue(command.get(0).getClass().equals(
+          CloseContainerCommand.class) ||
+          command.get(1).getClass().equals(CloseContainerCommand.class));
     } catch (IOException e) {
       e.printStackTrace();
       throw  e;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 01c53ba..28a3484 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 
 import java.io.IOException;
 
@@ -31,7 +32,7 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
   public MockRatisPipelineProvider(NodeManager nodeManager,
                             PipelineStateManager stateManager,
                             Configuration conf) {
-    super(nodeManager, stateManager, conf, null);
+    super(nodeManager, stateManager, conf, new EventQueue());
   }
 
   protected void initializePipeline(Pipeline pipeline) throws IOException {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index 94c3039..3d9f05a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -22,10 +22,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -34,7 +30,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -68,10 +63,11 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue, null);
+          nodeManager, eventQueue);
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
@@ -88,10 +84,8 @@ public class TestHealthyPipelineSafeModeRule {
     } finally {
       FileUtil.fullyDelete(new File(storageDir));
     }
-
   }
 
-
   @Test
   public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception {
 
@@ -113,10 +107,11 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue, null);
+          nodeManager, eventQueue);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
@@ -162,7 +157,6 @@ public class TestHealthyPipelineSafeModeRule {
     } finally {
       FileUtil.fullyDelete(new File(storageDir));
     }
-
   }
 
 
@@ -188,10 +182,11 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue, null);
+          nodeManager, eventQueue);
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
@@ -217,7 +212,7 @@ public class TestHealthyPipelineSafeModeRule {
           scmSafeModeManager.getHealthyPipelineSafeModeRule();
 
 
-      // No datanodes have sent pipelinereport from datanode
+      // No pipeline event have sent to SCMSafemodeManager
       Assert.assertFalse(healthyPipelineSafeModeRule.validate());
 
 
@@ -225,14 +220,14 @@ public class TestHealthyPipelineSafeModeRule {
           GenericTestUtils.LogCapturer.captureLogs(LoggerFactory.getLogger(
               SCMSafeModeManager.class));
 
-      // fire event with pipeline report with ratis type and factor 1
+      // fire event with pipeline create status with ratis type and factor 1
       // pipeline, validate() should return false
       firePipelineEvent(pipeline1, eventQueue);
 
       GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
-          "reported count is 0"),
+          "reported count is 1"),
           1000, 5000);
-      Assert.assertFalse(healthyPipelineSafeModeRule.validate());
+      Assert.assertTrue(healthyPipelineSafeModeRule.validate());
 
       firePipelineEvent(pipeline2, eventQueue);
       firePipelineEvent(pipeline3, eventQueue);
@@ -246,19 +241,7 @@ public class TestHealthyPipelineSafeModeRule {
 
   }
 
-
   private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) {
-    PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
-        .newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf()));
-
-    // Here no need to fire event from 3 nodes, as already pipeline is in
-    // open state, but doing it.
-    eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-        new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-            pipeline.getNodes().get(0), reportBuilder.build()));
+    eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
   }
-
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index ca54d05..0fa5eae 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -20,10 +20,6 @@ package org.apache.hadoop.hdds.scm.safemode;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -32,7 +28,6 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -63,6 +58,8 @@ public class TestOneReplicaPipelineSafeModeRule {
         HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
     ozoneConfiguration.set(HddsConfigKeys.OZONE_METADATA_DIRS,
         folder.newFolder().toString());
+    ozoneConfiguration.setBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
     List<ContainerInfo> containers = new ArrayList<>();
     containers.addAll(HddsTestUtils.getContainerInfo(1));
@@ -71,7 +68,7 @@ public class TestOneReplicaPipelineSafeModeRule {
     eventQueue = new EventQueue();
     pipelineManager =
         new SCMPipelineManager(ozoneConfiguration, mockNodeManager,
-            eventQueue, null);
+            eventQueue);
 
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
@@ -123,7 +120,6 @@ public class TestOneReplicaPipelineSafeModeRule {
     firePipelineEvent(pipelines.get(pipelineFactorThreeCount - 1));
 
     GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
   }
 
 
@@ -170,11 +166,8 @@ public class TestOneReplicaPipelineSafeModeRule {
     firePipelineEvent(pipelines.get(pipelineCountThree - 1));
 
     GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
   }
 
-
-
   private void createPipelines(int count,
       HddsProtos.ReplicationFactor factor) throws Exception {
     for (int i = 0; i < count; i++) {
@@ -184,26 +177,6 @@ public class TestOneReplicaPipelineSafeModeRule {
   }
 
   private void firePipelineEvent(Pipeline pipeline) {
-    PipelineReportsProto.Builder reportBuilder =
-        PipelineReportsProto.newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf()));
-
-    if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(0), reportBuilder.build()));
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(1), reportBuilder.build()));
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(2), reportBuilder.build()));
-    } else {
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(0), reportBuilder.build()));
-    }
+    eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
   }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 247b38a..b5839bc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -43,7 +41,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -73,6 +70,8 @@ public class TestSCMSafeModeManager {
   public static void setUp() {
     queue = new EventQueue();
     config = new OzoneConfiguration();
+    config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        false);
   }
 
   @Test
@@ -177,7 +176,7 @@ public class TestSCMSafeModeManager {
         HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, healthyPercent);
     conf.setDouble(HddsConfigKeys.
         HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT, oneReplicaPercent);
-
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     return conf;
   }
 
@@ -199,7 +198,7 @@ public class TestSCMSafeModeManager {
           0.9);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue, null);
+          mockNodeManager, queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForHealthyPipelinePercent");
@@ -217,7 +216,7 @@ public class TestSCMSafeModeManager {
           200);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue, null);
+          mockNodeManager, queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
@@ -234,7 +233,7 @@ public class TestSCMSafeModeManager {
       conf.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, -1.0);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue, null);
+          mockNodeManager, queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForSafeModePercent");
@@ -258,7 +257,7 @@ public class TestSCMSafeModeManager {
 
     MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
     SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
-        mockNodeManager, queue, null);
+        mockNodeManager, queue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
             pipelineManager.getStateManager(), config);
@@ -302,12 +301,12 @@ public class TestSCMSafeModeManager {
     // we shall a get an event when datanode is registered. In that case,
     // validate will return true, and add this to validatedRules.
     if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) {
-      firePipelineEvent(pipelines.get(0));
+      firePipelineEvent(pipelineManager, pipelines.get(0));
     }
 
     for (int i = 0; i < Math.max(healthyPipelineThresholdCount,
-        oneReplicaThresholdCount); i++) {
-      firePipelineEvent(pipelines.get(i));
+        Math.min(oneReplicaThresholdCount, pipelines.size())); i++) {
+      firePipelineEvent(pipelineManager, pipelines.get(i));
 
       if (i < healthyPipelineThresholdCount) {
         checkHealthy(i + 1);
@@ -352,15 +351,11 @@ public class TestSCMSafeModeManager {
         1000,  5000);
   }
 
-  private void firePipelineEvent(Pipeline pipeline) throws Exception {
-    PipelineReportsProto.Builder reportBuilder =
-        PipelineReportsProto.newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf()));
-    queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-        new PipelineReportFromDatanode(pipeline.getNodes().get(0),
-            reportBuilder.build()));
+  private void firePipelineEvent(SCMPipelineManager pipelineManager,
+      Pipeline pipeline) throws Exception {
+    pipelineManager.openPipeline(pipeline.getId());
+    queue.fireEvent(SCMEvents.OPEN_PIPELINE,
+        pipelineManager.getPipeline(pipeline.getId()));
   }
 
 
@@ -479,7 +474,7 @@ public class TestSCMSafeModeManager {
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, queue, null);
+          nodeManager, queue);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
@@ -490,10 +485,6 @@ public class TestSCMSafeModeManager {
       Pipeline pipeline = pipelineManager.createPipeline(
           HddsProtos.ReplicationType.RATIS,
           HddsProtos.ReplicationFactor.THREE);
-      PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
-          .newBuilder();
-      reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-          .setPipelineID(pipeline.getId().getProtobuf()));
 
       scmSafeModeManager = new SCMSafeModeManager(
           config, containers, pipelineManager, queue);
@@ -502,10 +493,9 @@ public class TestSCMSafeModeManager {
           HddsTestUtils.createNodeRegistrationContainerReport(containers));
       assertTrue(scmSafeModeManager.getInSafeMode());
 
-      // Trigger the processed pipeline report event
-      queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new PipelineReportFromDatanode(pipeline.getNodes().get(0),
-              reportBuilder.build()));
+
+
+      firePipelineEvent(pipelineManager, pipeline);
 
       GenericTestUtils.waitFor(() -> {
         return !scmSafeModeManager.getInSafeMode();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
index e4f1a37..378a1a6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -164,7 +164,9 @@ public class TestContainerStateManagerIntegration {
       }
     }
 
-    cluster.restartStorageContainerManager(true);
+    // Restart SCM will not trigger container report to satisfy the safe mode
+    // exit rule.
+    cluster.restartStorageContainerManager(false);
 
     List<ContainerInfo> result = cluster.getStorageContainerManager()
         .getContainerManager().listContainer(null, 100);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559..21fa7bd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -81,7 +81,7 @@ public class TestPipelineClose {
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
     conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
         TimeUnit.MILLISECONDS);
-    pipelineDestroyTimeoutInMillis = 5000;
+    pipelineDestroyTimeoutInMillis = 1000;
     conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
         pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS);
     cluster.waitForClusterToBeReady();
@@ -169,7 +169,7 @@ public class TestPipelineClose {
         new PipelineActionHandler(pipelineManager, conf);
     pipelineActionHandler
         .onMessage(pipelineActionsFromDatanode, new EventQueue());
-    Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2));
+    Thread.sleep(5000);
     OzoneContainer ozoneContainer =
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
             .getContainer();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 4b3d5d6..924dce1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -45,7 +46,9 @@ public class TestRatisPipelineProvider {
   @Before
   public void init() throws Exception {
     nodeManager = new MockNodeManager(true, 10);
-    stateManager = new PipelineStateManager(new OzoneConfiguration());
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+    stateManager = new PipelineStateManager(conf);
     provider = new MockRatisPipelineProvider(nodeManager,
         stateManager, new OzoneConfiguration());
   }
@@ -57,7 +60,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-            Pipeline.PipelineState.OPEN);
+            Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
     Pipeline pipeline1 = provider.create(factor);
     stateManager.addPipeline(pipeline1);
@@ -68,7 +71,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline1.getFactor(), factor);
     Assert.assertEquals(pipeline1.getPipelineState(),
-            Pipeline.PipelineState.OPEN);
+            Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
   }
 
@@ -80,7 +83,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-        Pipeline.PipelineState.OPEN);
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
 
     factor = HddsProtos.ReplicationFactor.ONE;
@@ -94,7 +97,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline1.getFactor(), factor);
     Assert.assertEquals(pipeline1.getPipelineState(),
-        Pipeline.PipelineState.OPEN);
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
   }
 
@@ -189,7 +192,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-        Pipeline.PipelineState.OPEN);
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
     List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 2a486b1..5d934eb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -61,6 +63,8 @@ public class TestSCMPipelineManager {
     testDir = GenericTestUtils
         .getTestDir(TestSCMPipelineManager.class.getSimpleName());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    conf.set(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
+        "false");
     boolean folderExisted = testDir.exists() || testDir.mkdirs();
     if (!folderExisted) {
       throw new IOException("Unable to create test directory path");
@@ -76,7 +80,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testPipelineReload() throws IOException {
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -93,7 +97,7 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should be able to load the pipelines from the db
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -116,7 +120,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testRemovePipeline() throws IOException {
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -134,7 +138,7 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should not be able to load removed pipelines
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     try {
       pipelineManager.getPipeline(pipeline.getId());
       Assert.fail("Pipeline should not have been retrieved");
@@ -150,7 +154,7 @@ public class TestSCMPipelineManager {
   public void testPipelineReport() throws IOException {
     EventQueue eventQueue = new EventQueue();
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue, null);
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -165,10 +169,11 @@ public class TestSCMPipelineManager {
     Pipeline pipeline = pipelineManager
         .createPipeline(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE);
+
     Assert
         .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
     Assert
-        .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
+        .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isOpen());
 
     // get pipeline report from each dn in the pipeline
     PipelineReportHandler pipelineReportHandler =
@@ -217,7 +222,7 @@ public class TestSCMPipelineManager {
     MockNodeManager nodeManagerMock = new MockNodeManager(true,
         20);
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManagerMock, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManagerMock, new EventQueue());
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManagerMock,
             pipelineManager.getStateManager(), conf);
@@ -226,9 +231,9 @@ public class TestSCMPipelineManager {
 
     MetricsRecordBuilder metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    long numPipelineCreated = getLongCounter("NumPipelineCreated",
+    long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
         metrics);
-    Assert.assertTrue(numPipelineCreated == 0);
+    Assert.assertTrue(numPipelineAllocated == 0);
 
     // 3 DNs are unhealthy.
     // Create 5 pipelines (Use up 15 Datanodes)
@@ -241,8 +246,8 @@ public class TestSCMPipelineManager {
 
     metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
-    Assert.assertTrue(numPipelineCreated == 5);
+    numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+    Assert.assertTrue(numPipelineAllocated == 5);
 
     long numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
@@ -261,8 +266,8 @@ public class TestSCMPipelineManager {
 
     metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
-    Assert.assertTrue(numPipelineCreated == 5);
+    numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+    Assert.assertTrue(numPipelineAllocated == 5);
 
     numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
@@ -272,7 +277,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testActivateDeactivatePipeline() throws IOException {
     final SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     final PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index ac76482..b06a2fb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -143,11 +143,16 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       throws TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(() -> {
       final int healthy = scm.getNodeCount(HEALTHY);
-      final boolean isReady = healthy == hddsDatanodes.size();
+      final boolean isNodeReady = healthy == hddsDatanodes.size();
+      final boolean exitSafeMode = !scm.isInSafeMode();
+
       LOG.info("{}. Got {} of {} DN Heartbeats.",
-          isReady? "Cluster is ready" : "Waiting for cluster to be ready",
+          isNodeReady? "Nodes are ready" : "Waiting for nodes to be ready",
+          healthy, hddsDatanodes.size());
+      LOG.info(exitSafeMode? "Cluster exits safe mode" :
+              "Waiting for cluster to exit safe mode",
           healthy, hddsDatanodes.size());
-      return isReady;
+      return isNodeReady && exitSafeMode;
     }, 1000, waitForClusterToBeReadyTimeout);
   }
 
@@ -615,7 +620,6 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       if (hbInterval.isPresent()) {
         conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
             hbInterval.get(), TimeUnit.MILLISECONDS);
-
       } else {
         conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
             DEFAULT_HB_INTERVAL_MS,


---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-help@hadoop.apache.org