You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2022/10/13 06:36:06 UTC

[ozone] 01/01: Merge branch 'master' into HDDS-6517-Snapshot

This is an automated email from the ASF dual-hosted git repository.

nanda pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 4be3864675ad98801cda5caabde29734420bc662
Merge: 6eb19a7710 4e364001ac
Author: Nandakumar <na...@apache.org>
AuthorDate: Thu Oct 13 08:56:23 2022 +0530

    Merge branch 'master' into HDDS-6517-Snapshot

 .github/workflows/close-pending.yaml               |    2 +-
 .github/workflows/comments.yaml                    |    2 +-
 .github/workflows/post-commit.yml                  |   26 +-
 dev-support/annotations/pom.xml                    |  114 --
 .../services/javax.annotation.processing.Processor |   17 -
 dev-support/ci/pr_title_check.bats                 |    8 +
 dev-support/ci/pr_title_check.sh                   |    2 +
 hadoop-hdds/annotations/pom.xml                    |   34 +
 .../annotations/ReplicateAnnotationProcessor.java  |    3 +
 .../RequestFeatureValidatorProcessor.java          |    0
 .../org/apache/ozone/annotations/package-info.java |    0
 hadoop-hdds/client/pom.xml                         |    2 +-
 .../hadoop/hdds/scm/ContainerClientMetrics.java    |  138 +++
 .../hadoop/hdds/scm/storage/BlockInputStream.java  |    2 +-
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |   10 +-
 .../hadoop/hdds/scm/storage/ChunkInputStream.java  |    3 +-
 .../hdds/scm/storage/ECBlockOutputStream.java      |    6 +-
 .../hdds/scm/storage/RatisBlockOutputStream.java   |    7 +-
 .../hadoop/ozone/client/io/ECBlockInputStream.java |    2 +-
 .../client/io/ECBlockReconstructedInputStream.java |    2 +-
 .../hdds/scm/TestContainerClientMetrics.java       |  116 ++
 .../storage/TestBlockOutputStreamCorrectness.java  |    4 +-
 .../hdds/scm/storage/TestChunkInputStream.java     |    9 +-
 .../ozone/client/io/TestECBlockInputStream.java    |    3 +-
 hadoop-hdds/common/pom.xml                         |   12 +-
 .../hdds/client/DefaultReplicationConfig.java      |    2 +-
 .../hadoop/hdds/client/ECReplicationConfig.java    |    2 +-
 .../org/apache/hadoop/hdds/client/OzoneQuota.java  |    9 -
 .../org/apache/hadoop/hdds/client/QuotaList.java   |   19 +-
 .../hadoop/hdds/client/RatisReplicationConfig.java |    2 +-
 .../hadoop/hdds/client/ReplicationConfig.java      |    4 +-
 .../hdds/client/ReplicationConfigValidator.java    |   32 +-
 .../hadoop/hdds/protocol/DatanodeDetails.java      |   11 +
 .../apache/hadoop/hdds/recon/ReconConfigKeys.java  |    5 +-
 .../apache/hadoop/hdds/scm/PlacementPolicy.java    |   18 +-
 .../java/org/apache/hadoop/hdds/scm/ScmConfig.java |    3 +-
 .../protocol/StorageContainerLocationProtocol.java |    2 +-
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |    8 +
 .../org/apache/hadoop/ozone/common/Checksum.java   |    3 +-
 .../services/javax.annotation.processing.Processor |    2 +
 .../common/src/main/resources/ozone-default.xml    |   73 +-
 .../hdds/client/TestECReplicationConfig.java       |   29 +-
 .../hadoop/hdds/client/TestReplicationConfig.java  |   36 +-
 .../client/TestReplicationConfigValidator.java     |   14 +-
 .../hadoop/hdds/conf/SimpleConfiguration.java      |   13 +
 .../hadoop/hdds/conf/TestOzoneConfiguration.java   |   10 +
 .../hadoop/hdds/scm/pipeline/MockPipeline.java     |    2 +
 .../hadoop/hdds/conf/ConfigFileGenerator.java      |    3 +
 .../org/apache/hadoop/hdds/conf/ConfigType.java    |    3 +-
 .../hdds/conf/ConfigurationReflectionUtil.java     |    9 +
 .../hadoop/hdds/conf/ConfigurationTarget.java      |    4 +
 .../hadoop/hdds/conf/ConfigurationExample.java     |   13 +
 .../hdds/conf/TestConfigurationReflectionUtil.java |    4 +
 .../dev-support/findbugsExcludeFile.xml            |    2 +-
 hadoop-hdds/container-service/pom.xml              |    2 +-
 .../datanode/metadata/DatanodeCRLStoreImpl.java    |   13 +-
 .../container/common/report/ReportManager.java     |    4 +-
 .../container/common/report/ReportPublisher.java   |    6 +-
 .../common/report/ReportPublisherFactory.java      |    6 +-
 .../common/statemachine/DatanodeConfiguration.java |  107 ++
 .../common/statemachine/EndpointStateMachine.java  |   16 +-
 .../statemachine/EndpointStateMachineMBean.java    |    2 +
 .../common/statemachine/StateContext.java          |   56 +-
 .../commandhandler/DeleteBlocksCommandHandler.java |   32 +-
 .../DeleteContainerCommandHandler.java             |   11 +-
 .../states/endpoint/HeartbeatEndpointTask.java     |    6 +-
 .../container/common/utils/HddsVolumeUtil.java     |    8 +-
 .../common/utils/db/DatanodeDBProfile.java         |   21 +-
 .../common/volume/AvailableSpaceFilter.java        |   84 ++
 .../volume/CapacityVolumeChoosingPolicy.java       |   42 +-
 .../ozone/container/common/volume/HddsVolume.java  |   14 +-
 .../volume/RoundRobinVolumeChoosingPolicy.java     |   22 +-
 .../common/volume/VolumeChoosingUtil.java          |   49 +
 .../container/common/volume/VolumeIOStats.java     |   10 +-
 .../container/common/volume/VolumeInfoMetrics.java |    5 +
 .../ECReconstructionCoordinator.java               |    8 +-
 .../container/keyvalue/KeyValueContainer.java      |    9 +-
 .../keyvalue/impl/ChunkManagerFactory.java         |   12 +-
 .../background/BlockDeletingService.java           |   27 +-
 .../container/metadata/AbstractDatanodeStore.java  |   40 +-
 .../metadata/DatanodeSchemaThreeDBDefinition.java  |    2 +-
 .../ozoneimpl/AbstractContainerScanner.java        |  136 +++
 ...s.java => AbstractContainerScannerMetrics.java} |   42 +-
 .../container/ozoneimpl/ContainerDataScanner.java  |  135 +--
 .../ozoneimpl/ContainerDataScannerMetrics.java     |   69 ++
 .../ozoneimpl/ContainerDataScrubberMetrics.java    |  120 --
 .../ozoneimpl/ContainerMetadataScanner.java        |  112 +-
 .../ozoneimpl/ContainerMetadataScannerMetrics.java |   44 +
 ...ion.java => ContainerScannerConfiguration.java} |   12 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |    9 +-
 .../upgrade/DatanodeSchemaV3FinalizeAction.java    |    3 +-
 .../protocol/commands/CommandForDatanode.java      |    4 +-
 .../hadoop/ozone/protocol/commands/SCMCommand.java |    4 +-
 .../webapps/hddsDatanode/dn-overview.html          |    2 +
 .../common/report/TestReportPublisher.java         |    6 +-
 .../common/statemachine/TestStateContext.java      |   22 +-
 .../container/common/utils/TestHddsVolumeUtil.java |    8 +-
 .../volume/TestCapacityVolumeChoosingPolicy.java   |   21 +-
 .../volume/TestRoundRobinVolumeChoosingPolicy.java |   33 +-
 .../TestVolumeIOStatsWithPrometheusSink.java       |   92 ++
 .../container/keyvalue/TestKeyValueContainer.java  |   36 -
 .../keyvalue/TestKeyValueContainerCheck.java       |   15 +-
 ...java => TestContainerScannerConfiguration.java} |   28 +-
 ...trics.java => TestContainerScannerMetrics.java} |   20 +-
 hadoop-hdds/docs/content/concept/Recon.md          |    2 +-
 hadoop-hdds/docs/content/concept/Recon.zh.md       |    2 +-
 hadoop-hdds/docs/content/security/SecuringTDE.md   |   12 +-
 .../docs/content/security/SecuringTDE.zh.md        |    6 +-
 hadoop-hdds/framework/pom.xml                      |   16 +-
 ...inerLocationProtocolClientSideTranslatorPB.java |    2 +-
 .../org/apache/hadoop/hdds/server/OzoneAdmins.java |   87 ++
 .../hadoop/hdds/server/http/BaseHttpServer.java    |    8 +-
 .../hadoop/hdds/server/http/HttpServer2.java       |    3 +
 .../hdds/server/http/HttpServer2Metrics.java       |   94 ++
 .../hadoop/hdds/utils/DBCheckpointServlet.java     |   11 +-
 .../java/org/apache/hadoop/hdds/utils/HAUtils.java |   30 +-
 .../apache/hadoop/hdds/utils/db/CodecRegistry.java |   22 +-
 .../org/apache/hadoop/hdds/utils/db/DBProfile.java |   19 +-
 .../hadoop/hdds/utils/db/DBStoreBuilder.java       |   57 +-
 .../hadoop/hdds/utils/db/RDBCheckpointManager.java |    7 +-
 .../hadoop/hdds/utils/db/RDBSstFileWriter.java     |   18 +-
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |   25 +-
 .../apache/hadoop/hdds/utils/db/RocksDatabase.java |   45 +-
 .../apache/hadoop/hdds/utils/db/TableConfig.java   |   14 +-
 .../db/managed/ManagedBlockBasedTableConfig.java   |   74 ++
 .../db/managed/ManagedColumnFamilyOptions.java     |   37 +-
 .../utils/db/managed/ManagedRocksObjectUtils.java  |    2 +-
 ...dColumnFamilyOptions.java => ManagedSlice.java} |   28 +-
 .../server/http/TestHttpServer2MetricsTest.java    |   94 ++
 hadoop-hdds/hadoop-dependency-client/pom.xml       |   14 +-
 hadoop-hdds/hadoop-dependency-server/pom.xml       |   24 +
 hadoop-hdds/pom.xml                                |    7 +
 .../hadoop/hdds/scm/SCMCommonPlacementPolicy.java  |   51 +-
 .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java |   12 +-
 .../scm/block/DeletedBlockLogStateManagerImpl.java |    6 -
 .../hdds/scm/container/ContainerReplica.java       |    2 +-
 .../scm/container/ContainerStateManagerImpl.java   |  178 +--
 .../scm/container/balancer/ContainerBalancer.java  |  179 +--
 .../balancer/ContainerBalancerMetrics.java         |   29 +
 .../algorithms/SCMContainerPlacementCapacity.java  |   12 +-
 .../algorithms/SCMContainerPlacementRackAware.java |   14 +-
 .../SCMContainerPlacementRackScatter.java          |  264 +++--
 .../algorithms/SCMContainerPlacementRandom.java    |   13 +-
 .../replication/ContainerCheckRequest.java         |  120 ++
 .../replication/ContainerHealthCheck.java          |   35 -
 .../replication/ContainerHealthResult.java         |    9 +-
 .../replication/ECContainerHealthCheck.java        |   86 --
 .../replication/ECContainerReplicaCount.java       |   48 +-
 .../replication/ECOverReplicationHandler.java      |   22 +-
 .../replication/ECUnderReplicationHandler.java     |  191 ++-
 .../replication/LegacyReplicationManager.java      |    4 +-
 .../replication/OverReplicatedProcessor.java       |   37 +-
 .../container/replication/ReplicationManager.java  |  252 ++--
 .../container/replication/ReplicationQueue.java    |   73 ++
 .../replication/UnderReplicatedProcessor.java      |   37 +-
 .../replication/health/AbstractCheck.java          |   71 ++
 .../ClosedWithMismatchedReplicasHandler.java       |   89 ++
 .../health/ClosingContainerHandler.java            |   62 +
 .../health/ECReplicationCheckHandler.java          |  134 +++
 .../container/replication/health/HealthCheck.java  |   56 +
 .../replication/health/OpenContainerHandler.java   |   81 ++
 .../health/QuasiClosedContainerHandler.java        |  131 ++
 .../container/replication/health/package-info.java |   10 +-
 .../scm/container/states/ContainerAttribute.java   |   14 +-
 .../hdds/scm/ha/SCMHADBTransactionBufferImpl.java  |    3 +
 .../apache/hadoop/hdds/scm/ha/StatefulService.java |   12 +-
 .../apache/hadoop/hdds/scm/ha/io/CodecFactory.java |   11 +-
 .../apache/hadoop/hdds/scm/ha/io/EnumCodec.java    |    2 +-
 .../hdds/scm/ha/io/GeneratedMessageCodec.java      |   12 +-
 .../apache/hadoop/hdds/scm/ha/io/ListCodec.java    |    2 +-
 .../hdds/scm/metadata/SCMMetadataStoreImpl.java    |   51 +-
 .../hdds/scm/metadata/SCMMetadataStoreMetrics.java |  116 ++
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |   10 +-
 .../hadoop/hdds/scm/server/SCMCertStore.java       |    5 +-
 .../hdds/scm/server/SCMClientProtocolServer.java   |    2 +-
 .../hdds/scm/server/SCMDBCheckpointServlet.java    |    1 +
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |    4 +-
 .../hdds/scm/server/StorageContainerManager.java   |   52 +-
 .../hadoop/hdds/scm/block/TestDeletedBlockLog.java |  154 ++-
 .../container/balancer/TestContainerBalancer.java  |  189 +--
 .../algorithms/TestContainerPlacementFactory.java  |    4 +-
 .../TestSCMContainerPlacementRackScatter.java      |   76 +-
 .../container/replication/ReplicationTestUtil.java |   56 +-
 .../replication/TestECContainerReplicaCount.java   |   84 +-
 .../replication/TestECOverReplicationHandler.java  |    5 +-
 .../replication/TestECUnderReplicationHandler.java |  100 +-
 .../replication/TestOverReplicatedProcessor.java   |    7 +-
 .../replication/TestReplicationManager.java        |  102 +-
 .../replication/TestUnderReplicatedProcessor.java  |    7 +-
 .../TestClosedWithMismatchedReplicasHandler.java   |  217 ++++
 .../health/TestClosingContainerHandler.java        |  208 ++++
 .../TestECReplicationCheckHandler.java}            |  199 +++-
 .../health/TestOpenContainerHandler.java           |  168 +++
 .../health/TestQuasiClosedContainerHandler.java    |  238 ++++
 .../container/replication/health/package-info.java |   11 +-
 .../scm/metadata/TestSCMMetadataStoreImpl.java     |   77 ++
 hadoop-hdds/test-utils/pom.xml                     |    4 +-
 .../org/apache/ozone/test/GenericTestUtils.java    |   42 +
 hadoop-hdds/tools/pom.xml                          |    6 +-
 .../hdds/scm/cli/container/InfoSubcommand.java     |    6 +-
 .../hdds/scm/cli/container/TestInfoSubCommand.java |   22 +-
 hadoop-ozone/client/pom.xml                        |    6 +-
 .../client/checksum/BaseFileChecksumHelper.java    |   40 +-
 .../checksum/ReplicatedFileChecksumHelper.java     |    4 +-
 .../ozone/client/io/BlockOutputStreamEntry.java    |   19 +-
 .../client/io/BlockOutputStreamEntryPool.java      |   20 +-
 .../ozone/client/io/ECBlockOutputStreamEntry.java  |   16 +-
 .../client/io/ECBlockOutputStreamEntryPool.java    |    7 +-
 .../hadoop/ozone/client/io/ECKeyOutputStream.java  |    4 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |   28 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |    7 +-
 .../checksum/TestReplicatedFileChecksumHelper.java |   11 +-
 hadoop-ozone/common/pom.xml                        |   12 +-
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   39 +
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   11 +
 .../ozone/om/protocol/OzoneManagerProtocol.java    |   14 +
 ...OzoneManagerProtocolClientSideTranslatorPB.java |   20 +
 hadoop-ozone/csi/pom.xml                           |    6 +-
 .../dev-support/checks/_mvn_unit_report.sh         |    2 +-
 hadoop-ozone/dev-support/checks/rat.sh             |    4 +-
 .../compose/ozone-topology/docker-compose.yaml     |   15 +
 .../src/main/compose/ozone-topology/docker-config  |    5 +
 .../dist/src/main/compose/ozone-topology/test.sh   |    2 +
 hadoop-ozone/dist/src/main/license/bin/LICENSE.txt |   10 +-
 hadoop-ozone/dist/src/main/license/bin/NOTICE.txt  |    2 +-
 hadoop-ozone/dist/src/main/license/jar-report.txt  |   12 +-
 .../src/main/smoketest/admincli/pipeline.robot     |    5 +-
 .../dist/src/main/smoketest/ec/basic.robot         |    6 +-
 .../src/main/smoketest/freon/echoRPCLoad.robot     |   42 +
 .../dist/src/main/smoketest/recon/recon-api.robot  |   14 +
 .../main/smoketest/recon/recon-fso-nssummary.robot |  146 +++
 .../dist/src/main/smoketest/s3/commonawslib.robot  |    2 +
 .../dist/src/main/smoketest/s3/freon.robot         |   44 +
 .../dist/src/main/smoketest/s3/objectputget.robot  |    6 +
 hadoop-ozone/dist/src/shell/ozone/ozone            |    3 +-
 .../hadoop/ozone/MiniOzoneLoadGenerator.java       |   22 +-
 .../hadoop/ozone/TestMiniChaosOzoneCluster.java    |   28 +-
 hadoop-ozone/integration-test/pom.xml              |   94 +-
 .../fs/contract/AbstractContractUnbufferTest.java  |  160 ---
 .../hadoop/fs/ozone/TestOzoneFSInputStream.java    |   80 +-
 .../hadoop/fs/ozone/TestOzoneFileSystem.java       |  203 ++++
 .../hadoop/fs/ozone/TestRootedOzoneFileSystem.java |  247 ++++
 .../hdds/scm/TestSCMDbCheckpointServlet.java       |   31 +-
 .../apache/hadoop/hdds/scm/TestSCMSnapshot.java    |   32 +-
 .../metrics/TestSCMContainerManagerMetrics.java    |   81 +-
 .../hdds/scm/pipeline/TestMultiRaftSetup.java      |   42 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java    |   40 +-
 .../hadoop/hdds/scm/pipeline/TestNodeFailure.java  |   22 +-
 .../hadoop/hdds/scm/pipeline/TestSCMRestart.java   |   33 +-
 .../safemode/TestSCMSafeModeWithPipelineRules.java |   35 +-
 .../java/org/apache/hadoop/ozone/TestDataUtil.java |   21 +-
 .../client/rpc/read/TestChunkInputStream.java      |   20 +
 .../ozone/container/TestECContainerRecovery.java   |   84 +-
 .../TestDataScanner.java}                          |   31 +-
 .../apache/hadoop/ozone/om/TestKeyManagerImpl.java |   11 +-
 .../om/{TestOmLDBCli.java => TestLDBCli.java}      |  124 +-
 .../hadoop/ozone/om/TestOMDbCheckpointServlet.java |    7 +-
 .../hadoop/ozone/scm/TestAllocateContainer.java    |   39 +-
 .../hadoop/ozone/scm/TestCloseContainer.java       |   26 +-
 .../hadoop/ozone/scm/TestContainerSmallFile.java   |   84 +-
 .../scm/TestGetCommittedBlockLengthAndPutKey.java  |   47 +-
 .../org/apache/hadoop/ozone/scm/TestSCMMXBean.java |   31 +-
 .../hadoop/ozone/scm/TestSCMNodeManagerMXBean.java |   30 +-
 .../hadoop/ozone/scm/TestXceiverClientGrpc.java    |   22 +-
 .../hadoop/ozone/scm/TestXceiverClientManager.java |  103 +-
 .../hadoop/ozone/scm/TestXceiverClientMetrics.java |   19 +-
 .../scm/node/TestDecommissionAndMaintenance.java   |  210 ++--
 .../hadoop/ozone/scm/node/TestQueryNode.java       |   29 +-
 .../scm/pipeline/TestPipelineManagerMXBean.java    |   35 +-
 .../TestSCMPipelineBytesWrittenMetrics.java        |   28 +-
 .../ozone/scm/pipeline/TestSCMPipelineMetrics.java |   69 +-
 .../hadoop/ozone/shell/TestOzoneShellHA.java       |   21 +-
 .../src/test/resources/testSequenceFile            |  Bin 0 -> 96 bytes
 .../src/main/proto/OmClientProtocol.proto          |   33 +-
 .../hadoop/ozone/om/codec/TestOmKeyInfoCodec.java  |   45 +-
 hadoop-ozone/ozone-manager/pom.xml                 |   16 +
 .../org/apache/hadoop/ozone/om/KeyManagerImpl.java |  665 +----------
 .../hadoop/ozone/om/OMDBCheckpointServlet.java     |    6 +-
 .../java/org/apache/hadoop/ozone/om/OMMXBean.java  |    2 +
 .../hadoop/ozone/om/OMPerformanceMetrics.java      |   92 ++
 .../apache/hadoop/ozone/om/OmMetadataReader.java   |    4 +-
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  |    4 +-
 .../apache/hadoop/ozone/om/OzoneConfigUtil.java    |   61 +
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  153 ++-
 .../java/org/apache/hadoop/ozone/om/ScmClient.java |   81 +-
 .../om/hashcodegenerator/OMHashCodeGenerator.java  |    2 +-
 .../StringOMHashCodeGeneratorImpl.java             |    2 +-
 .../ozone/om/lock/OBSKeyPathLockStrategy.java      |   38 +-
 .../hadoop/ozone/om/lock/OzoneLockStrategy.java    |   15 +-
 .../ozone/om/lock/RegularBucketLockStrategy.java   |    8 +-
 .../RangerClientMultiTenantAccessController.java   |   27 +-
 .../ozone/om/ratis/OzoneManagerRatisServer.java    |   13 +
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |    3 +
 .../hadoop/ozone/om/request/OMEchoRPCRequest.java} |   25 +-
 .../ozone/om/request/key/OMKeyCreateRequest.java   |   11 +-
 .../hadoop/ozone/om/request/key/OMKeyRequest.java  |    6 +
 .../request/s3/security/S3SecretRequestHelper.java |    2 +-
 .../protocolPB/OzoneManagerRequestHandler.java     |   29 +
 .../ozone/security/acl/OzoneNativeAuthorizer.java  |   22 +-
 .../webapps/ozoneManager/om-overview.html          |   20 +
 .../resources/webapps/ozoneManager/ozoneManager.js |   30 +
 .../org/apache/hadoop/ozone/om/OmTestManagers.java |    2 +-
 .../hadoop/ozone/om/TestOzoneConfigUtil.java       |   42 +
 .../org/apache/hadoop/ozone/om/TestScmClient.java  |  191 +++
 .../om/request/key/TestOMKeyCreateRequest.java     |   47 +-
 .../request/key/TestOMKeyCreateRequestWithFSO.java |    5 +
 .../ozone/om/request/key/TestOMKeyRequest.java     |    5 +-
 .../s3/security/TestS3GetSecretRequest.java        |  206 ++--
 .../ozone/om/response/TestCleanupTableInfo.java    |    3 +
 .../security/acl/TestOzoneAdministrators.java      |   66 +-
 .../security/acl/TestOzoneNativeAuthorizer.java    |    8 +-
 .../hadoop/ozone/security/acl/TestParentAcl.java   |    3 +-
 .../hadoop/ozone/security/acl/TestVolumeOwner.java |    3 +-
 hadoop-ozone/ozonefs-common/pom.xml                |    5 +-
 .../hadoop/fs/ozone/BasicOzoneFileSystem.java      |  138 ++-
 .../fs/ozone/BasicRootedOzoneFileSystem.java       |  154 ++-
 hadoop-ozone/ozonefs-hadoop2/pom.xml               |    6 +-
 hadoop-ozone/ozonefs-hadoop3/pom.xml               |    6 +-
 .../hadoop/fs/ozone/RootedOzoneFileSystem.java     |    6 +
 hadoop-ozone/ozonefs-shaded/pom.xml                |    4 +
 hadoop-ozone/ozonefs/pom.xml                       |    5 +-
 .../hadoop/fs/ozone/RootedOzoneFileSystem.java     |    6 +
 .../hadoop/ozone/recon/ReconControllerModule.java  |    4 +-
 .../hadoop/ozone/recon/ReconRestServletModule.java |    2 +-
 .../hadoop/ozone/recon/api/NSSummaryEndpoint.java  |  929 +--------------
 .../hadoop/ozone/recon/api/PipelineEndpoint.java   |   57 +-
 .../ozone/recon/api/filters/ReconAdminFilter.java  |   12 +-
 .../recon/api/handlers/BucketEntityHandler.java    |  152 +++
 .../ozone/recon/api/handlers/BucketHandler.java    |  207 ++++
 .../recon/api/handlers/DirectoryEntityHandler.java |  150 +++
 .../ozone/recon/api/handlers/EntityHandler.java    |  342 ++++++
 .../ozone/recon/api/handlers/FSOBucketHandler.java |  279 +++++
 .../ozone/recon/api/handlers/KeyEntityHandler.java |   93 ++
 .../recon/api/handlers/RootEntityHandler.java      |  176 +++
 .../recon/api/handlers/UnknownEntityHandler.java   |   82 ++
 .../recon/api/handlers/VolumeEntityHandler.java    |  160 +++
 .../EntityType.java => handlers/package-info.java} |   13 +-
 .../ozone/recon/api/types/ContainerKeyPrefix.java  |    9 +
 .../hadoop/ozone/recon/api/types/EntityType.java   |   85 +-
 ...ainerKeyPrefix.java => KeyPrefixContainer.java} |   50 +-
 .../hadoop/ozone/recon/api/types/NSSummary.java    |    7 +-
 .../recon/recovery/ReconOmMetadataManagerImpl.java |   10 +-
 .../hadoop/ozone/recon/scm/ReconNodeManager.java   |   18 +-
 .../scm/ReconStorageContainerManagerFacade.java    |   11 +-
 .../recon/spi/ReconContainerMetadataManager.java   |   17 +
 .../recon/spi/impl/KeyPrefixContainerCodec.java    |   92 ++
 .../impl/ReconContainerMetadataManagerImpl.java    |  304 +++--
 .../ozone/recon/spi/impl/ReconDBDefinition.java    |   12 +-
 .../ozone/recon/spi/impl/ReconDBProvider.java      |   11 +-
 .../ozone/recon/tasks/ContainerKeyMapperTask.java  |   94 +-
 .../hadoop/ozone/recon/tasks/NSSummaryTask.java    |  206 +---
 .../ozone/recon/tasks/NSSummaryTaskWithFSO.java    |  217 ++++
 .../webapps/recon/ozone-recon-web/api/db.json      |   46 +-
 .../webapps/recon/ozone-recon-web/api/routes.json  |    3 +-
 .../components/autoReloadPanel/autoReloadPanel.tsx |   44 +-
 .../webapps/recon/ozone-recon-web/src/routes.tsx   |    8 +
 .../src/views/datanodes/datanodes.tsx              |   29 +-
 .../src/views/overview/overview.tsx                |   35 +-
 .../src/views/pipelines/pipelines.less             |    2 +-
 .../src/views/pipelines/pipelines.tsx              |   33 +-
 .../ozone/recon/OMMetadataManagerTestUtils.java    |   44 +-
 .../ozone/recon/api/TestNSSummaryEndpoint.java     |  663 -----------
 .../recon/api/TestNSSummaryEndpointWithFSO.java    | 1249 ++++++++++++++++++++
 .../ozone/recon/api/filters/TestAdminFilter.java   |   23 +
 .../ozone/recon/fsck/TestContainerHealthTask.java  |    3 +-
 .../recovery/TestReconOmMetadataManagerImpl.java   |   11 +
 .../TestReconContainerMetadataManagerImpl.java     |   25 +
 .../ozone/recon/tasks/TestNSSummaryTask.java       |  541 ---------
 .../recon/tasks/TestNSSummaryTaskWithFSO.java      |  604 ++++++++++
 hadoop-ozone/s3gateway/pom.xml                     |   42 -
 .../hadoop/ozone/s3/endpoint/BucketEndpoint.java   |   12 +-
 .../hadoop/ozone/s3/endpoint/EndpointBase.java     |    7 +
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   |   74 +-
 .../ozone/s3/signature/StringToSignProducer.java   |    1 +
 .../hadoop/ozone/s3/endpoint/TestObjectGet.java    |  153 ++-
 .../ozone/s3/metrics/TestS3GatewayMetrics.java     |   10 +
 .../hadoop/ozone/debug/DBDefinitionFactory.java    |    3 +-
 .../org/apache/hadoop/ozone/debug/DBScanner.java   |   61 +-
 .../ozone/debug/container/ContainerCommands.java   |   12 +
 .../ozone/debug/container/ExportSubcommand.java    |   35 +-
 .../hadoop/ozone/freon/BaseFreonGenerator.java     |   10 +-
 .../java/org/apache/hadoop/ozone/freon/Freon.java  |    5 +-
 .../ozone/freon/FreonReplicationOptions.java       |    5 +-
 .../hadoop/ozone/freon/OmRPCLoadGenerator.java     |  120 ++
 .../hadoop/ozone/freon/S3BucketGenerator.java      |   73 ++
 .../hadoop/ozone/freon/S3EntityGenerator.java      |   65 +
 .../apache/hadoop/ozone/freon/S3KeyGenerator.java  |   47 +-
 .../freon/containergenerator/GeneratorOm.java      |   14 +-
 .../hadoop/ozone/shell/ReplicationOptions.java     |   18 +-
 .../ozone/shell/ShellReplicationOptions.java       |    3 +-
 .../ozone/shell/bucket/InfoBucketHandler.java      |    2 +-
 .../ozone/shell/bucket/ListBucketHandler.java      |   17 +-
 pom.xml                                            |   44 +-
 393 files changed, 14348 insertions(+), 6595 deletions(-)

diff --cc hadoop-hdds/common/src/main/resources/ozone-default.xml
index aba10935e9,4d059d2d60..c10344756a
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@@ -3296,13 -3338,25 +3347,33 @@@
        will create intermediate directories.
      </description>
    </property>
 +  <property>
 +    <name>ozone.om.snapshot.cache.max.size</name>
 +    <value>10</value>
 +    <tag>OZONE, OM</tag>
 +    <description>
 +      Size of the OM Snapshot LRU cache.  This is the maximum number of open OM Snapshot RocksDb instances
 +      that will be held in memory at any time.
 +    </description>
 +  </property>
+ 
+   <property>
+     <name>ozone.om.container.location.cache.size</name>
+     <value>100000</value>
+     <tag>OZONE, OM</tag>
+     <description>
+       The size of the container locations cache in Ozone Manager. This cache allows Ozone Manager to populate
+       block locations in key-read responses without calling SCM, thus increases Ozone Manager read performance.
+     </description>
+   </property>
+ 
+   <property>
+     <name>ozone.om.container.location.cache.ttl</name>
+     <value>360m</value>
+     <tag>OZONE, OM</tag>
+     <description>
+       The time to live for container location cache in Ozone.
+     </description>
+   </property>
+ 
 -
  </configuration>
diff --cc hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index fd90343ae4,aca30a7b20..5478e8658a
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@@ -90,7 -90,7 +90,8 @@@ public final class DBStoreBuilder 
    private RocksDBConfiguration rocksDBConfiguration;
    // Flag to indicate if the RocksDB should be opened readonly.
    private boolean openReadOnly = false;
 +  private int maxFSSnapshots = 0;
+   private final DBProfile defaultCfProfile;
  
    /**
     * Create DBStoreBuilder from a generic DBDefinition.
@@@ -178,27 -175,26 +176,30 @@@
  
      Set<TableConfig> tableConfigs = makeTableConfigs();
  
-     if (rocksDBOption == null) {
-       rocksDBOption = getDefaultDBOptions(tableConfigs);
-     }
+     try {
+       if (rocksDBOption == null) {
+         rocksDBOption = getDefaultDBOptions(tableConfigs);
+       }
  
-     ManagedWriteOptions writeOptions = new ManagedWriteOptions();
-     writeOptions.setSync(rocksDBConfiguration.getSyncOption());
+       ManagedWriteOptions writeOptions = new ManagedWriteOptions();
+       writeOptions.setSync(rocksDBConfiguration.getSyncOption());
  
-     File dbFile = getDBFile();
-     if (!dbFile.getParentFile().exists()) {
-       throw new IOException("The DB destination directory should exist.");
-     }
+       File dbFile = getDBFile();
+       if (!dbFile.getParentFile().exists()) {
+         throw new IOException("The DB destination directory should exist.");
+       }
  
-     return new RDBStore(dbFile, rocksDBOption, writeOptions, tableConfigs,
-         registry, openReadOnly, maxFSSnapshots);
+       return new RDBStore(dbFile, rocksDBOption, writeOptions, tableConfigs,
 -          registry, openReadOnly, dbJmxBeanNameName);
++          registry, openReadOnly, maxFSSnapshots, dbJmxBeanNameName);
+     } finally {
+       tableConfigs.forEach(TableConfig::close);
+     }
    }
  
 +  public DBStoreBuilder setMaxFSSnapshots(int maxFSSnapshots) {
 +    this.maxFSSnapshots = maxFSSnapshots;
 +    return this;
 +  }
- 
    public DBStoreBuilder setName(String name) {
      dbname = name;
      return this;
diff --cc hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java
index 6835fe559a,943d58ef1d..f227ab81fd
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java
@@@ -95,14 -92,8 +96,18 @@@ public class RDBCheckpointManager imple
      return null;
    }
  
 +  /**
 +   * Create RocksDB snapshot by saving a checkpoint to a directory.
 +   *
 +   * @param parentDir The directory where the checkpoint needs to be created.
 +   * @return RocksDB specific Checkpoint information object.
 +   */
 +  public RocksDBCheckpoint createCheckpoint(String parentDir) {
 +    return createCheckpoint(parentDir, null);
 +  }
 +
+   @Override
+   public void close() throws IOException {
+     checkpoint.close();
+   }
  }
diff --cc hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index 4b98213af1,be8cfb9900..9fabf5e9df
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@@ -60,36 -57,29 +60,40 @@@ public class RDBStore implements DBStor
    private ObjectName statMBeanName;
    private final RDBCheckpointManager checkPointManager;
    private final String checkpointsParentDir;
 +  private final String snapshotsParentDir;
    private final RDBMetrics rdbMetrics;
 +  private final RocksDBCheckpointDiffer rocksDBCheckpointDiffer;
+   private final String dbJmxBeanName;
  
    @VisibleForTesting
    public RDBStore(File dbFile, ManagedDBOptions options,
                    Set<TableConfig> families) throws IOException {
      this(dbFile, options, new ManagedWriteOptions(), families,
-         new CodecRegistry(), false, 1000);
 -        new CodecRegistry(), false, null);
++        new CodecRegistry(), false, 1000, null);
    }
  
++  @SuppressWarnings("parameternumber")
    public RDBStore(File dbFile, ManagedDBOptions dbOptions,
                    ManagedWriteOptions writeOptions, Set<TableConfig> families,
-                   CodecRegistry registry, boolean readOnly, int maxFSSnapshots)
-       throws IOException {
 -                  CodecRegistry registry, boolean readOnly,
++                  CodecRegistry registry, boolean readOnly, int maxFSSnapshots,
+                   String dbJmxBeanNameName) throws IOException {
      Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
      Preconditions.checkNotNull(families);
      Preconditions.checkArgument(!families.isEmpty());
      codecRegistry = registry;
      dbLocation = dbFile;
+     dbJmxBeanName = dbJmxBeanNameName == null ? dbFile.getName() :
+         dbJmxBeanNameName;
  
      try {
 +      rocksDBCheckpointDiffer =
 +          new RocksDBCheckpointDiffer(
 +              dbLocation.getAbsolutePath(), maxFSSnapshots,
 +          Paths.get(dbLocation.getParent(), "db.checkpoints").toString(),
 +          Paths.get(dbLocation.getParent(), "db.savedSSTFiles").toString(),
 +          dbLocation.getAbsolutePath(), 0, "Snapshot_");
 +      rocksDBCheckpointDiffer.setRocksDBForCompactionTracking(dbOptions);
 +
        db = RocksDatabase.open(dbFile, dbOptions, writeOptions,
            families, readOnly);
  
diff --cc hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index caa61e8d69,65a2f67883..ab7bacb05b
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@@ -814,8 -800,37 +819,42 @@@ public final class OmUtils 
      return printString.toString();
    }
  
 +  // Key points to entire bucket's snapshot
 +  public static boolean isBucketSnapshotIndicator(String key) {
 +    return key.startsWith(OM_SNAPSHOT_INDICATOR) && key.split("/").length == 2;
 +  }
++
+   public static String format(List<ServiceInfo> nodes, int port,
+                               String leaderId) {
+     StringBuilder sb = new StringBuilder();
+     // Ensuring OM's are printed in correct order
+     List<ServiceInfo> omNodes = nodes.stream()
+         .filter(node -> node.getNodeType() == HddsProtos.NodeType.OM)
+         .sorted(Comparator.comparing(ServiceInfo::getHostname))
+         .collect(Collectors.toList());
+     int count = 0;
+     for (ServiceInfo info : omNodes) {
+       // Printing only the OM's running
+       if (info.getNodeType() == HddsProtos.NodeType.OM) {
+         String role =
+             info.getOmRoleInfo().getNodeId().equals(leaderId) ? "LEADER" :
+                 "FOLLOWER";
+         sb.append(
+             String.format(
+                 " { HostName: %s | Node-Id: %s | Ratis-Port : %d | Role: %s} ",
+                 info.getHostname(),
+                 info.getOmRoleInfo().getNodeId(),
+                 port,
+                 role
+             ));
+         count++;
+       }
+     }
+     // Print Stand-alone if only one OM exists
+     if (count == 1) {
+       return "STANDALONE";
+     } else {
+       return sb.toString();
+     }
+   }
  }
diff --cc hadoop-ozone/dist/src/main/license/jar-report.txt
index 9581b6dbfb,ac5a3ab9a1..ba25a09545
--- a/hadoop-ozone/dist/src/main/license/jar-report.txt
+++ b/hadoop-ozone/dist/src/main/license/jar-report.txt
@@@ -255,7 -245,7 +250,8 @@@ share/ozone/lib/ratis-thirdparty-misc.j
  share/ozone/lib/ratis-tools.jar
  share/ozone/lib/re2j.jar
  share/ozone/lib/reflections.jar
 +share/ozone/lib/rocksdb-checkpoint-differ.jar
+ share/ozone/lib/reload4j.jar
  share/ozone/lib/rocksdbjni.jar
  share/ozone/lib/simpleclient_common.jar
  share/ozone/lib/simpleclient_dropwizard.jar
diff --cc hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index d46dda9ff3,88501120bf..5c9b3409a5
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@@ -122,7 -122,7 +122,10 @@@ enum Type 
  
    SetRangerServiceVersion = 107;
    RangerBGSync = 109;
-   CreateSnapshot = 110;
++
+   EchoRPC = 110;
++
++  CreateSnapshot = 111;
  }
  
  message OMRequest {
@@@ -227,7 -227,7 +230,11 @@@
  
    optional SetRangerServiceVersionRequest   SetRangerServiceVersionRequest = 107;
    optional RangerBGSyncRequest              RangerBGSyncRequest            = 109;
-   optional CreateSnapshotRequest            CreateSnapshotRequest          = 110;
++
+   optional EchoRPCRequest                   EchoRPCRequest                 = 110;
++
++  optional CreateSnapshotRequest            CreateSnapshotRequest          = 111;
++
  }
  
  message OMResponse {
@@@ -325,7 -325,7 +332,11 @@@
  
    optional SetRangerServiceVersionResponse   SetRangerServiceVersionResponse = 107;
    optional RangerBGSyncResponse              RangerBGSyncResponse          = 109;
-   optional CreateSnapshotResponse            CreateSnapshotResponse        = 110;
++
+   optional EchoRPCResponse                   EchoRPCResponse               = 110;
++
++  optional CreateSnapshotResponse            CreateSnapshotResponse        = 111;
++
  }
  
  enum Status {
diff --cc hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
index d901336223,4ea8c52bf4..a2de68dcc2
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java
@@@ -171,17 -161,37 +159,17 @@@ public class KeyManagerImpl implements 
  
    private BackgroundService openKeyCleanupService;
  
 -  @VisibleForTesting
 -  public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
 -      OMMetadataManager metadataManager, OzoneConfiguration conf,
 -      OzoneBlockTokenSecretManager secretManager,
 -      OMPerformanceMetrics metrics) {
 -    this(null, new ScmClient(scmBlockClient, null, conf), metadataManager,
 -        conf, secretManager, null, metrics);
 -  }
 -
 -  @VisibleForTesting
 -  public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
 -      StorageContainerLocationProtocol scmContainerClient,
 -      OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
 -      OzoneBlockTokenSecretManager secretManager,
 -      OMPerformanceMetrics metrics) {
 -    this(null, new ScmClient(scmBlockClient, scmContainerClient, conf),
 -        metadataManager, conf, secretManager, null,
 -        metrics);
 -  }
 -
    public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
-       OzoneConfiguration conf, String omId) {
-     this (om, scmClient, om.getMetadataManager(), conf, omId,
-         om.getBlockTokenMgr(), om.getKmsProvider(), om.getPrefixManager());
+       OzoneConfiguration conf, OMPerformanceMetrics metrics) {
+     this (om, scmClient, om.getMetadataManager(), conf,
+         om.getBlockTokenMgr(), om.getKmsProvider(), metrics);
    }
  
-   @SuppressWarnings("parameternumber")
    public KeyManagerImpl(OzoneManager om, ScmClient scmClient,
-       OMMetadataManager metadataManager, OzoneConfiguration conf, String omId,
+       OMMetadataManager metadataManager, OzoneConfiguration conf,
        OzoneBlockTokenSecretManager secretManager,
-       KeyProviderCryptoExtension kmsProvider, PrefixManager prefixManager) {
+       KeyProviderCryptoExtension kmsProvider,
+       OMPerformanceMetrics metrics) {
      this.scmBlockSize = (long) conf
          .getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT,
              StorageUnit.BYTES);
diff --cc hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
index d3a14e267f,0000000000..42771f66f9
mode 100644,000000..100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
@@@ -1,499 -1,0 +1,501 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with this
 + * work for additional information regarding copyright ownership.  The ASF
 + * licenses this file to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + * <p>
 + * http://www.apache.org/licenses/LICENSE-2.0
 + * <p>
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
 + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 + * License for the specific language governing permissions and limitations under
 + * the License.
 + */
 +
 +package org.apache.hadoop.ozone.om;
 +
 +import java.io.IOException;
 +import org.apache.commons.lang3.tuple.Pair;
 +import org.apache.hadoop.hdds.conf.OzoneConfiguration;
++import org.apache.hadoop.hdds.server.OzoneAdmins;
 +import org.apache.hadoop.ipc.ProtobufRpcEngine;
 +import org.apache.hadoop.ipc.Server;
 +import org.apache.hadoop.ozone.OzoneAcl;
 +import org.apache.hadoop.ozone.OzoneConsts;
 +import org.apache.hadoop.ozone.audit.AuditAction;
 +import org.apache.hadoop.ozone.audit.AuditEventStatus;
 +import org.apache.hadoop.ozone.audit.AuditLogger;
 +import org.apache.hadoop.ozone.audit.AuditMessage;
 +import org.apache.hadoop.ozone.audit.Auditor;
 +import org.apache.hadoop.ozone.audit.OMAction;
 +import org.apache.hadoop.ozone.om.exceptions.OMException;
 +import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 +import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 +import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 +import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
 +import org.apache.hadoop.ozone.security.acl.RequestContext;
 +import org.apache.hadoop.security.UserGroupInformation;
 +import org.apache.hadoop.util.ReflectionUtils;
 +import org.slf4j.Logger;
 +import java.net.InetAddress;
 +import java.util.List;
 +import java.util.Map;
 +
 +import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName;
 +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
 +import static org.apache.hadoop.ozone.om.KeyManagerImpl.getRemoteUser;
 +import static org.apache.hadoop.ozone.om.OzoneManager.getS3Auth;
 +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_REQUEST;
 +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
 +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
 +import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
 +import org.apache.hadoop.ozone.security.acl.OzoneAccessAuthorizer;
 +import org.apache.hadoop.ozone.security.acl.OzoneNativeAuthorizer;
 +import org.apache.hadoop.ozone.security.acl.OzoneObj;
 +import org.apache.hadoop.ozone.security.acl.OzoneObj.ResourceType;
 +import org.apache.hadoop.ozone.security.acl.OzoneObj.StoreType;
 +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 +
 +/**
 + * OM Metadata Reading class for the OM and Snapshot managers.
 + *
 + * This abstraction manages all the metadata key/acl reading
 + * from a rocksDb instance, for both the OM and OM snapshots.
 + */
 +public class OmMetadataReader implements IOmMetadataReader, Auditor {
 +  private final KeyManager keyManager;
 +  private final PrefixManager prefixManager;
 +  private final VolumeManager volumeManager;
 +  private final BucketManager bucketManager;
 +  private final OzoneManager ozoneManager;
 +  private final boolean isAclEnabled;
 +  private final IAccessAuthorizer accessAuthorizer;
 +  private final boolean isNativeAuthorizerEnabled;
 +  private final OmMetadataReaderMetrics metrics;
 +  private final Logger log;
 +  private final AuditLogger audit;
 +
 +  public OmMetadataReader(KeyManager keyManager,
 +                   PrefixManager prefixManager,
 +                   OzoneManager ozoneManager,
 +                   Logger log,
 +                   AuditLogger audit,
 +                   OmMetadataReaderMetrics omMetadataReaderMetrics) {
 +    this.keyManager = keyManager;
 +    this.bucketManager = ozoneManager.getBucketManager();
 +    this.volumeManager = ozoneManager.getVolumeManager();
 +    this.prefixManager = prefixManager;
 +    OzoneConfiguration configuration = ozoneManager.getConfiguration();
 +    this.ozoneManager = ozoneManager;
 +    this.isAclEnabled = ozoneManager.getAclsEnabled();
 +    this.log = log;
 +    this.audit = audit;
 +    boolean allowListAllVolumes = ozoneManager.getAllowListAllVolumes();
 +    metrics = omMetadataReaderMetrics;
 +    if (isAclEnabled) {
 +      accessAuthorizer = getACLAuthorizerInstance(configuration);
 +      if (accessAuthorizer instanceof OzoneNativeAuthorizer) {
 +        OzoneNativeAuthorizer authorizer =
 +            (OzoneNativeAuthorizer) accessAuthorizer;
 +        isNativeAuthorizerEnabled = true;
 +        authorizer.setVolumeManager(volumeManager);
 +        authorizer.setBucketManager(bucketManager);
 +        authorizer.setKeyManager(keyManager);
 +        authorizer.setPrefixManager(prefixManager);
-         authorizer.setOzoneAdmins(ozoneManager.getOmAdminUsernames());
++        authorizer.setOzoneAdmins(
++            new OzoneAdmins(ozoneManager.getOmAdminUsernames()));
 +        authorizer.setAllowListAllVolumes(allowListAllVolumes);
 +      } else {
 +        isNativeAuthorizerEnabled = false;
 +      }
 +    } else {
 +      accessAuthorizer = null;
 +      isNativeAuthorizerEnabled = false;
 +    }
 +  }
 +
 +  /**
 +   * Lookup a key.
 +   *
 +   * @param args - attributes of the key.
 +   * @return OmKeyInfo - the info about the requested key.
 +   * @throws IOException
 +   */
 +  @Override
 +  public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
 +    ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
 +
 +    if (isAclEnabled) {
 +      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
 +          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
 +    }
 +
 +    boolean auditSuccess = true;
 +    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
 +
 +    args = bucket.update(args);
 +
 +    try {
 +      metrics.incNumKeyLookups();
 +      return keyManager.lookupKey(args, getClientAddress());
 +    } catch (Exception ex) {
 +      metrics.incNumKeyLookupFails();
 +      auditSuccess = false;
 +      audit.logReadFailure(buildAuditMessageForFailure(OMAction.READ_KEY,
 +          auditMap, ex));
 +      throw ex;
 +    } finally {
 +      if (auditSuccess) {
 +        audit.logReadSuccess(buildAuditMessageForSuccess(OMAction.READ_KEY,
 +            auditMap));
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
 +      String startKey, long numEntries, boolean allowPartialPrefixes)
 +      throws IOException {
 +
 +    ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
 +
 +    if (isAclEnabled) {
 +      checkAcls(getResourceType(args), StoreType.OZONE, ACLType.READ,
 +          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
 +    }
 +
 +    boolean auditSuccess = true;
 +    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
 +
 +    args = bucket.update(args);
 +
 +    try {
 +      metrics.incNumListStatus();
 +      return keyManager.listStatus(args, recursive, startKey, numEntries,
 +              getClientAddress(), allowPartialPrefixes);
 +    } catch (Exception ex) {
 +      metrics.incNumListStatusFails();
 +      auditSuccess = false;
 +      audit.logReadFailure(buildAuditMessageForFailure(OMAction.LIST_STATUS,
 +          auditMap, ex));
 +      throw ex;
 +    } finally {
 +      if (auditSuccess) {
 +        audit.logReadSuccess(buildAuditMessageForSuccess(
 +            OMAction.LIST_STATUS, auditMap));
 +      }
 +    }
 +  }
 +  
 +  @Override
 +  public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
 +    ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
 +
 +    boolean auditSuccess = true;
 +    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
 +
 +    args = bucket.update(args);
 +
 +    try {
 +      metrics.incNumGetFileStatus();
 +      return keyManager.getFileStatus(args, getClientAddress());
 +    } catch (IOException ex) {
 +      metrics.incNumGetFileStatusFails();
 +      auditSuccess = false;
 +      audit.logReadFailure(
 +          buildAuditMessageForFailure(OMAction.GET_FILE_STATUS, auditMap, ex));
 +      throw ex;
 +    } finally {
 +      if (auditSuccess) {
 +        audit.logReadSuccess(
 +            buildAuditMessageForSuccess(OMAction.GET_FILE_STATUS, auditMap));
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException {
 +    ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
 +
 +    if (isAclEnabled) {
 +      checkAcls(ResourceType.KEY, StoreType.OZONE, ACLType.READ,
 +          bucket.realVolume(), bucket.realBucket(), args.getKeyName());
 +    }
 +
 +    boolean auditSuccess = true;
 +    Map<String, String> auditMap = bucket.audit(args.toAuditMap());
 +
 +    args = bucket.update(args);
 +
 +    try {
 +      metrics.incNumLookupFile();
 +      return keyManager.lookupFile(args, getClientAddress());
 +    } catch (Exception ex) {
 +      metrics.incNumLookupFileFails();
 +      auditSuccess = false;
 +      audit.logReadFailure(buildAuditMessageForFailure(OMAction.LOOKUP_FILE,
 +          auditMap, ex));
 +      throw ex;
 +    } finally {
 +      if (auditSuccess) {
 +        audit.logReadSuccess(buildAuditMessageForSuccess(
 +            OMAction.LOOKUP_FILE, auditMap));
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
 +      String startKey, String keyPrefix, int maxKeys) throws IOException {
 +
 +    ResolvedBucket bucket = ozoneManager.resolveBucketLink(
 +        Pair.of(volumeName, bucketName));
 +
 +    if (isAclEnabled) {
 +      checkAcls(ResourceType.BUCKET, StoreType.OZONE, ACLType.LIST,
 +          bucket.realVolume(), bucket.realBucket(), keyPrefix);
 +    }
 +
 +    boolean auditSuccess = true;
 +    Map<String, String> auditMap = bucket.audit();
 +    auditMap.put(OzoneConsts.START_KEY, startKey);
 +    auditMap.put(OzoneConsts.MAX_KEYS, String.valueOf(maxKeys));
 +    auditMap.put(OzoneConsts.KEY_PREFIX, keyPrefix);
 +
 +    try {
 +      metrics.incNumKeyLists();
 +      return keyManager.listKeys(bucket.realVolume(), bucket.realBucket(),
 +          startKey, keyPrefix, maxKeys);
 +    } catch (IOException ex) {
 +      metrics.incNumKeyListFails();
 +      auditSuccess = false;
 +      audit.logReadFailure(buildAuditMessageForFailure(OMAction.LIST_KEYS,
 +          auditMap, ex));
 +      throw ex;
 +    } finally {
 +      if (auditSuccess) {
 +        audit.logReadSuccess(buildAuditMessageForSuccess(OMAction.LIST_KEYS,
 +            auditMap));
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Returns list of ACLs for given Ozone object.
 +   *
 +   * @param obj Ozone object.
 +   * @throws IOException if there is error.
 +   */
 +  public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
 +    boolean auditSuccess = true;
 +
 +    try {
 +      if (isAclEnabled) {
 +        checkAcls(obj.getResourceType(), obj.getStoreType(), ACLType.READ_ACL,
 +            obj.getVolumeName(), obj.getBucketName(), obj.getKeyName());
 +      }
 +      metrics.incNumGetAcl();
 +      switch (obj.getResourceType()) {
 +      case VOLUME:
 +        return volumeManager.getAcl(obj);
 +      case BUCKET:
 +        return bucketManager.getAcl(obj);
 +      case KEY:
 +        return keyManager.getAcl(obj);
 +      case PREFIX:
 +        return prefixManager.getAcl(obj);
 +
 +      default:
 +        throw new OMException("Unexpected resource type: " +
 +            obj.getResourceType(), INVALID_REQUEST);
 +      }
 +    } catch (Exception ex) {
 +      auditSuccess = false;
 +      audit.logReadFailure(
 +          buildAuditMessageForFailure(OMAction.GET_ACL, obj.toAuditMap(), ex));
 +      throw ex;
 +    } finally {
 +      if (auditSuccess) {
 +        audit.logReadSuccess(
 +            buildAuditMessageForSuccess(OMAction.GET_ACL, obj.toAuditMap()));
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Checks if current caller has acl permissions.
 +   *
 +   * @param resType - Type of ozone resource. Ex volume, bucket.
 +   * @param store   - Store type. i.e Ozone, S3.
 +   * @param acl     - type of access to be checked.
 +   * @param vol     - name of volume
 +   * @param bucket  - bucket name
 +   * @param key     - key
 +   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied.
 +   */
 +  void checkAcls(ResourceType resType, StoreType store,
 +      ACLType acl, String vol, String bucket, String key)
 +      throws IOException {
 +    UserGroupInformation user;
 +    if (getS3Auth() != null) {
 +      String principal =
 +          OzoneAclUtils.accessIdToUserPrincipal(getS3Auth().getAccessId());
 +      user = UserGroupInformation.createRemoteUser(principal);
 +    } else {
 +      user = ProtobufRpcEngine.Server.getRemoteUser();
 +    }
 +
 +    InetAddress remoteIp = ProtobufRpcEngine.Server.getRemoteIp();
 +    String volumeOwner = ozoneManager.getVolumeOwner(vol, acl, resType);
 +    String bucketOwner = ozoneManager.getBucketOwner(vol, bucket, acl, resType);
 +
 +    OzoneAclUtils.checkAllAcls(this, resType, store, acl,
 +        vol, bucket, key, volumeOwner, bucketOwner,
 +        user != null ? user : getRemoteUser(),
 +        remoteIp != null ? remoteIp :
 +            ozoneManager.getOmRpcServerAddr().getAddress(),
 +        remoteIp != null ? remoteIp.getHostName() :
 +            ozoneManager.getOmRpcServerAddr().getHostName());
 +  }
 +
 +  
 +  /**
 +   * CheckAcls for the ozone object.
 +   *
 +   * @return true if permission granted, false if permission denied.
 +   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied
 +   *                     and throwOnPermissionDenied set to true.
 +   */
 +  @SuppressWarnings("parameternumber")
 +  public boolean checkAcls(ResourceType resType, StoreType storeType,
 +      ACLType aclType, String vol, String bucket, String key,
 +      UserGroupInformation ugi, InetAddress remoteAddress, String hostName,
 +      boolean throwIfPermissionDenied, String owner)
 +      throws OMException {
 +    OzoneObj obj = OzoneObjInfo.Builder.newBuilder()
 +        .setResType(resType)
 +        .setStoreType(storeType)
 +        .setVolumeName(vol)
 +        .setBucketName(bucket)
 +        .setKeyName(key).build();
 +    RequestContext context = RequestContext.newBuilder()
 +        .setClientUgi(ugi)
 +        .setIp(remoteAddress)
 +        .setHost(hostName)
 +        .setAclType(ACLIdentityType.USER)
 +        .setAclRights(aclType)
 +        .setOwnerName(owner)
 +        .build();
 +
 +    return checkAcls(obj, context, throwIfPermissionDenied);
 +  }
 +
 +  /**
 +   * CheckAcls for the ozone object.
 +   *
 +   * @return true if permission granted, false if permission denied.
 +   * @throws OMException ResultCodes.PERMISSION_DENIED if permission denied
 +   *                     and throwOnPermissionDenied set to true.
 +   */
 +  public boolean checkAcls(OzoneObj obj, RequestContext context,
 +                           boolean throwIfPermissionDenied)
 +      throws OMException {
 +
 +    if (!accessAuthorizer.checkAccess(obj, context)) {
 +      if (throwIfPermissionDenied) {
 +        String volumeName = obj.getVolumeName() != null ?
 +                "Volume:" + obj.getVolumeName() + " " : "";
 +        String bucketName = obj.getBucketName() != null ?
 +                "Bucket:" + obj.getBucketName() + " " : "";
 +        String keyName = obj.getKeyName() != null ?
 +                "Key:" + obj.getKeyName() : "";
 +        log.warn("User {} doesn't have {} permission to access {} {}{}{}",
 +            context.getClientUgi().getUserName(), context.getAclRights(),
 +            obj.getResourceType(), volumeName, bucketName, keyName);
 +        throw new OMException("User " + context.getClientUgi().getUserName() +
 +            " doesn't have " + context.getAclRights() +
 +            " permission to access " + obj.getResourceType() + " " +
 +            volumeName  + bucketName + keyName, ResultCodes.PERMISSION_DENIED);
 +      }
 +      return false;
 +    } else {
 +      return true;
 +    }
 +  }
 +
 +  /**
 +   * Returns an instance of {@link IAccessAuthorizer}.
 +   * Looks up the configuration to see if there is custom class specified.
 +   * Constructs the instance by passing the configuration directly to the
 +   * constructor to achieve thread safety using final fields.
 +   *
 +   * @param conf
 +   * @return IAccessAuthorizer
 +   */
 +  private IAccessAuthorizer getACLAuthorizerInstance(OzoneConfiguration conf) {
 +    Class<? extends IAccessAuthorizer> clazz = conf.getClass(
 +        OZONE_ACL_AUTHORIZER_CLASS, OzoneAccessAuthorizer.class,
 +        IAccessAuthorizer.class);
 +    return ReflectionUtils.newInstance(clazz, conf);
 +  }
 +
 +  private static String getClientAddress() {
 +    String clientMachine = Server.getRemoteAddress();
 +    if (clientMachine == null) { //not a RPC client
 +      clientMachine = "";
 +    }
 +    return clientMachine;
 +  }
 +
 +  @Override
 +  public AuditMessage buildAuditMessageForSuccess(AuditAction op,
 +      Map<String, String> auditMap) {
 +
 +    return new AuditMessage.Builder()
 +        .setUser(getRemoteUserName())
 +        .atIp(Server.getRemoteAddress())
 +        .forOperation(op)
 +        .withParams(auditMap)
 +        .withResult(AuditEventStatus.SUCCESS)
 +        .build();
 +  }
 +
 +  @Override
 +  public AuditMessage buildAuditMessageForFailure(AuditAction op,
 +      Map<String, String> auditMap, Throwable throwable) {
 +
 +    return new AuditMessage.Builder()
 +        .setUser(getRemoteUserName())
 +        .atIp(Server.getRemoteAddress())
 +        .forOperation(op)
 +        .withParams(auditMap)
 +        .withResult(AuditEventStatus.FAILURE)
 +        .withException(throwable)
 +        .build();
 +  }
 +
 +  /**
 +   * Returns true if OzoneNativeAuthorizer is enabled and false if otherwise.
 +   *
 +   * @return if native authorizer is enabled.
 +   */
 +  public boolean isNativeAuthorizerEnabled() {
 +    return isNativeAuthorizerEnabled;
 +  }
 +
 +  public IAccessAuthorizer getAccessAuthorizer() {
 +    return accessAuthorizer;
 +  }
 +
 +  private ResourceType getResourceType(OmKeyArgs args) {
 +    if (args.getKeyName() == null || args.getKeyName().length() == 0) {
 +      return ResourceType.BUCKET;
 +    }
 +    return ResourceType.KEY;
 +  }
 +
 +  
 +}
diff --cc hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
index dcbed30bd4,0000000000..ffdd136877
mode 100644,000000..100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@@ -1,199 -1,0 +1,199 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + *  with the License.  You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + *  Unless required by applicable law or agreed to in writing, software
 + *  distributed under the License is distributed on an "AS IS" BASIS,
 + *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + *  See the License for the specific language governing permissions and
 + *  limitations under the License.
 + */
 +
 +package org.apache.hadoop.ozone.om;
 +
 +import com.google.common.cache.CacheBuilder;
 +import com.google.common.cache.CacheLoader;
 +import com.google.common.cache.LoadingCache;
 +import com.google.common.cache.RemovalListener;
 +import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
 +import org.apache.hadoop.hdds.utils.db.RDBStore;
 +import org.apache.hadoop.ozone.OzoneConfigKeys;
 +import org.apache.hadoop.ozone.om.exceptions.OMException;
 +import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 +
 +import java.io.IOException;
 +
 +import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import javax.annotation.Nonnull;
 +import java.util.concurrent.ExecutionException;
 +
 +import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 +import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR;
 +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_KEY_NAME;
 +import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
 +
 +
 +/**
 + * This class is used to manage/create OM snapshots.
 + */
 +public final class OmSnapshotManager {
 +  private final OzoneManager ozoneManager;
 +  private final LoadingCache<String, OmSnapshot> snapshotCache;
 +
 +  private static final Logger LOG =
 +      LoggerFactory.getLogger(OmSnapshotManager.class);
 +
 +  OmSnapshotManager(OzoneManager ozoneManager) {
 +    this.ozoneManager = ozoneManager;
 +
 +    // size of lru cache
 +    int cacheSize = ozoneManager.getConfiguration().getInt(
 +        OzoneConfigKeys.OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE,
 +        OzoneConfigKeys.OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE_DEFAULT);
 +
 +    CacheLoader<String, OmSnapshot> loader;
 +    loader = new CacheLoader<String, OmSnapshot>() {
 +      @Override
 +
 +      // load the snapshot into the cache if not already there
 +      @Nonnull
 +      public OmSnapshot load(@Nonnull String snapshotTableKey)
 +          throws IOException {
 +        SnapshotInfo snapshotInfo;
 +        // see if the snapshot exists
 +        snapshotInfo = getSnapshotInfo(snapshotTableKey);
 +
 +        // read in the snapshot
 +        OzoneConfiguration conf = ozoneManager.getConfiguration();
 +        OMMetadataManager snapshotMetadataManager;
 +
 +        // Create the snapshot metadata manager by finding the corresponding
 +        // RocksDB instance, creating an OmMetadataManagerImpl instance based on
 +        // that
 +        try {
 +          snapshotMetadataManager = OmMetadataManagerImpl
 +              .createSnapshotMetadataManager(
 +              conf, snapshotInfo.getCheckpointDirName());
 +        } catch (IOException e) {
 +          LOG.error("Failed to retrieve snapshot: {}, {}", snapshotTableKey, e);
 +          throw e;
 +        }
 +
 +        // create the other manager instances based on snapshot metadataManager
 +        PrefixManagerImpl pm = new PrefixManagerImpl(snapshotMetadataManager,
 +            false);
 +        KeyManagerImpl km = new KeyManagerImpl(null,
-             ozoneManager.getScmClient(), snapshotMetadataManager, conf, null,
++            ozoneManager.getScmClient(), snapshotMetadataManager, conf,
 +            ozoneManager.getBlockTokenSecretManager(),
-             ozoneManager.getKmsProvider(), pm);
++            ozoneManager.getKmsProvider(), ozoneManager.getPerfMetrics());
 +
 +        return new OmSnapshot(km, pm, ozoneManager,
 +            snapshotInfo.getVolumeName(),
 +            snapshotInfo.getBucketName(),
 +            snapshotInfo.getName());
 +      }
 +    };
 +
 +    RemovalListener<String, OmSnapshot> removalListener
 +        = notification -> {
 +          try {
 +            // close snapshot's rocksdb on eviction
 +            notification.getValue().close();
 +          } catch (IOException e) {
 +            LOG.error("Failed to close snapshot: {} {}",
 +                notification.getKey(), e);
 +          }
 +        };
 +    // init LRU cache
 +    snapshotCache = CacheBuilder.newBuilder()
 +        .maximumSize(cacheSize)
 +        .removalListener(removalListener)
 +        .build(loader);
 +  }
 +
 +  /**
 +   * Creates snapshot checkpoint that corresponds to snapshotInfo.
 +   * @param omMetadataManager the metadata manager
 +   * @param snapshotInfo The metadata of snapshot to be created
 +   * @return instance of DBCheckpoint
 +   */
 +  public static DBCheckpoint createOmSnapshotCheckpoint(
 +      OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo)
 +      throws IOException {
 +    RDBStore store = (RDBStore) omMetadataManager.getStore();
 +    return store.getSnapshot(snapshotInfo.getCheckpointDirName());
 +  }
 +
 +  // Get OmSnapshot if the keyname has ".snapshot" key indicator
 +  public IOmMetadataReader checkForSnapshot(String volumeName,
 +                                     String bucketName, String keyname)
 +      throws IOException {
 +    if (keyname == null) {
 +      return ozoneManager.getOmMetadataReader();
 +    }
 +
 +    // see if key is for a snapshot
 +    String[] keyParts = keyname.split("/");
 +    if (isSnapshotKey(keyParts)) {
 +      String snapshotName = keyParts[1];
 +      if (snapshotName == null || snapshotName.isEmpty()) {
 +        // don't allow snapshot indicator without snapshot name
 +        throw new OMException(INVALID_KEY_NAME);
 +      }
 +      String snapshotTableKey = SnapshotInfo.getTableKey(volumeName,
 +          bucketName, snapshotName);
 +
 +      // retrieve the snapshot from the cache
 +      try {
 +        return snapshotCache.get(snapshotTableKey);
 +      } catch (ExecutionException e) {
 +        throw new IOException(e.getCause());
 +      }
 +    } else {
 +      return ozoneManager.getOmMetadataReader();
 +    }
 +  }
 +
 +  public SnapshotInfo getSnapshotInfo(String volumeName,
 +                                      String bucketName, String snapshotName)
 +      throws IOException {
 +    return getSnapshotInfo(SnapshotInfo.getTableKey(volumeName,
 +        bucketName, snapshotName));
 +  }
 +
 +  private SnapshotInfo getSnapshotInfo(String key) throws IOException {
 +    SnapshotInfo snapshotInfo;
 +    try {
 +      snapshotInfo = ozoneManager.getMetadataManager()
 +        .getSnapshotInfoTable()
 +        .get(key);
 +    } catch (IOException e) {
 +      LOG.error("Snapshot {}: not found: {}", key, e);
 +      throw e;
 +    }
 +    if (snapshotInfo == null) {
 +      throw new OMException(KEY_NOT_FOUND);
 +    }
 +    return snapshotInfo;
 +  }
 +
 +  public static String getSnapshotPrefix(String snapshotName) {
 +    return OM_SNAPSHOT_INDICATOR + OM_KEY_PREFIX +
 +        snapshotName + OM_KEY_PREFIX;
 +  }
 +
 +  public static boolean isSnapshotKey(String[] keyParts) {
 +    return (keyParts.length > 1) &&
 +        (keyParts[0].compareTo(OM_SNAPSHOT_INDICATOR) == 0);
 +  }
 +}
diff --cc hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 9664e602f4,8d6ae64ddf..8013e1bd15
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@@ -215,10 -222,9 +217,8 @@@ import static org.apache.hadoop.ozone.O
  import static org.apache.hadoop.ozone.OzoneAcl.AclScope.ACCESS;
  import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT;
  import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_RATIS_ENABLED_KEY;
 -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS;
  import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
  import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED_DEFAULT;
- import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
- import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS_WILDCARD;
  import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FLEXIBLE_FQDN_RESOLUTION_ENABLED;
  import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FLEXIBLE_FQDN_RESOLUTION_ENABLED_DEFAULT;
  import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX;
@@@ -565,10 -574,18 +569,19 @@@ public final class OzoneManager extend
      this.isS3MultiTenancyEnabled =
          OMMultiTenantManager.checkAndEnableMultiTenancy(this, conf);
  
-     // Get admin list
-     omAdminUsernames = getOzoneAdminsFromConfig(configuration);
- 
 +    metrics = OMMetrics.create();
+     // Get admin list
+     Collection<String> omAdminUsernames =
+         OzoneConfigUtil.getOzoneAdminsFromConfig(configuration);
+     Collection<String> omAdminGroups =
+         OzoneConfigUtil.getOzoneAdminsGroupsFromConfig(configuration);
+     omAdmins = new OzoneAdmins(omAdminUsernames, omAdminGroups);
+ 
+     Collection<String> s3AdminUsernames =
+             OzoneConfigUtil.getS3AdminsFromConfig(configuration);
+     Collection<String> s3AdminGroups =
+             OzoneConfigUtil.getS3AdminsGroupsFromConfig(configuration);
+     s3OzoneAdmins = new OzoneAdmins(s3AdminUsernames, s3AdminGroups);
      instantiateServices(false);
  
      // Create special volume s3v which is required for S3G.
@@@ -704,11 -722,9 +717,12 @@@
      }
  
      prefixManager = new PrefixManagerImpl(metadataManager, isRatisEnabled);
+     perfMetrics = OMPerformanceMetrics.register();
      keyManager = new KeyManagerImpl(this, scmClient, configuration,
-         omStorage.getOmId());
 -            perfMetrics);
++        perfMetrics);
 +    omMetadataReader = new OmMetadataReader(keyManager, prefixManager,
 +        this, LOG, AUDIT, metrics);
 +    omSnapshotManager = new OmSnapshotManager(this);
  
      if (withNewSnapshot) {
        Integer layoutVersionInDB = getLayoutVersionInDB();
@@@ -1423,6 -1464,6 +1437,10 @@@
      return metrics;
    }
  
++  public OMPerformanceMetrics getPerfMetrics() {
++    return perfMetrics;
++  }
++
    /**
     * Start service.
     */
@@@ -3805,24 -4109,22 +3834,13 @@@
     * @param callerUgi Caller UserGroupInformation
     */
    public boolean isAdmin(UserGroupInformation callerUgi) {
-     if (callerUgi == null) {
-       return false;
-     } else {
-       return isAdmin(callerUgi.getShortUserName())
-           || isAdmin(callerUgi.getUserName());
-     }
+     return callerUgi != null && omAdmins.isAdmin(callerUgi);
    }
  
-   @VisibleForTesting
-   private boolean isAdmin(String username) {
-     if (omAdminUsernames == null) {
-       return false;
-     } else {
-       return omAdminUsernames.contains(OZONE_ADMINISTRATORS_WILDCARD) ||
-           omAdminUsernames.contains(username);
-     }
+   public boolean isS3Admin(UserGroupInformation callerUgi) {
+     return callerUgi != null && s3OzoneAdmins.isAdmin(callerUgi);
    }
  
 -  /**
 -   * Returns true if OzoneNativeAuthorizer is enabled and false if otherwise.
 -   *
 -   * @return if native authorizer is enabled.
 -   */
 -  public boolean isNativeAuthorizerEnabled() {
 -    return isNativeAuthorizerEnabled;
 -  }
 -
    @VisibleForTesting
    public boolean isRunning() {
      return omState == State.RUNNING;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org