You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by pr...@apache.org on 2022/09/30 01:07:28 UTC

[ozone] branch HDDS-6517-Snapshot updated: HDDS-6855. [SNAPSHOTS] Path Based Access for Ozone Snapshots. (#3729)

This is an automated email from the ASF dual-hosted git repository.

prashantpogde pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this push:
     new 6eb19a7710 HDDS-6855. [SNAPSHOTS] Path Based Access for Ozone Snapshots. (#3729)
6eb19a7710 is described below

commit 6eb19a7710065cee1c969746a917fbd7f2c3c79f
Author: GeorgeJahad <gi...@blackbirdsystems.net>
AuthorDate: Thu Sep 29 18:07:18 2022 -0700

    HDDS-6855. [SNAPSHOTS] Path Based Access for Ozone Snapshots. (#3729)
    
    * updated feature branch
    
    * cleanup
    
    * checkstyle
    
    * cleaned up Bucket handling
    
    * trigger new CI check
    
    * trigger new CI check
    
    * trigger new CI check
    
    * trigger new CI check
    
    * remove unneeded delete test
    
    * trigger new CI check
    
    * Update hadoop-hdds/common/src/main/resources/ozone-default.xml
    
    Co-authored-by: Siyao Meng <50...@users.noreply.github.com>
    
    * Update hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/IOmMetadataReader.java
    
    Co-authored-by: Siyao Meng <50...@users.noreply.github.com>
    
    * Update hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
    
    Co-authored-by: Siyao Meng <50...@users.noreply.github.com>
    
    * Update hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
    
    Co-authored-by: Siyao Meng <50...@users.noreply.github.com>
    
    * Update hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
    
    Co-authored-by: Siyao Meng <50...@users.noreply.github.com>
    
    * Update hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
    
    Co-authored-by: Siyao Meng <50...@users.noreply.github.com>
    
    * fixups
    
    * add TimeUnit
    
    * trigger new CI check
    
    * renamed OmLock to IOzoneManagerLock
    
    * cleanup
    
    * cleanup
    
    * fix timeout
    
    * remove failcount
    
    * checkstyle
    
    * now close on eviction
    
    * fix merge conflict
    
    * trigger new CI check
    
    Co-authored-by: George Jahad <ge...@georgejahad.com>
    Co-authored-by: Siyao Meng <50...@users.noreply.github.com>
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   4 +
 .../common/src/main/resources/ozone-default.xml    |   9 +
 .../apache/hadoop/ozone/client/OzoneBucket.java    |   7 +-
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |   6 +
 .../apache/hadoop/ozone/om/IOmMetadataReader.java  | 118 +++++
 .../hadoop/ozone/om/helpers/SnapshotInfo.java      |   8 +-
 .../hadoop/ozone/om/lock/IOzoneManagerLock.java    |  98 ++++
 .../hadoop/ozone/om/lock/OmReadOnlyLock.java       | 167 ++++++
 .../hadoop/ozone/om/lock/OzoneManagerLock.java     |  28 +-
 .../ozone/om/protocol/OzoneManagerProtocol.java    | 102 +---
 .../org/apache/hadoop/ozone/om/TestOmSnapshot.java | 420 +++++++++++++++
 .../hadoop/ozone/om/TestOmSnapshotFileSystem.java  | 581 +++++++++++++++++++++
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   4 +-
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  40 +-
 .../apache/hadoop/ozone/om/OmMetadataReader.java   |  12 +-
 .../org/apache/hadoop/ozone/om/OmSnapshot.java     | 242 +++++++++
 .../apache/hadoop/ozone/om/OmSnapshotManager.java  | 199 +++++++
 .../apache/hadoop/ozone/om/OmSnapshotMetrics.java  | 147 ++++++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  39 +-
 .../apache/hadoop/ozone/om/SnapshotManager.java    |  48 --
 .../snapshot/OMSnapshotCreateResponse.java         |   5 +-
 .../hadoop/ozone/om/TestOmSnapshotManager.java     | 125 +++++
 .../snapshot/TestOMSnapshotCreateRequest.java      |  15 +-
 23 files changed, 2238 insertions(+), 186 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 9894298c41..09370aed56 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -511,6 +511,10 @@ public final class OzoneConfigKeys {
 
   public static final String OZONE_AUDIT_LOG_DEBUG_CMD_LIST_OMAUDIT =
       "ozone.audit.log.debug.cmd.list.omaudit";
+
+  public static final String OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE =
+      "ozone.om.snapshot.cache.max.size";
+  public static final int OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE_DEFAULT = 10;
   /**
    * There is no need to instantiate this class.
    */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index ac2b5e204b..aba10935e9 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -3296,4 +3296,13 @@
       will create intermediate directories.
     </description>
   </property>
+  <property>
+    <name>ozone.om.snapshot.cache.max.size</name>
+    <value>10</value>
+    <tag>OZONE, OM</tag>
+    <description>
+      Size of the OM Snapshot LRU cache.  This is the maximum number of open OM Snapshot RocksDb instances
+      that will be held in memory at any time.
+    </description>
+  </property>
 </configuration>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
index 596afa6371..b271308f2c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneBucket.java
@@ -1368,9 +1368,12 @@ public class OzoneBucket extends WithMetadata {
       addedKeyPrefix = true;
 
       // not required to addKeyPrefix
-      // case-1) if keyPrefix is null or empty
+      // case-1) if keyPrefix is null/empty/just contains snapshot indicator
+      // (The snapshot indicator is the null prefix equivalent for snapshot
+      //  reads.)
       // case-2) if startKey is not null or empty
-      if (StringUtils.isBlank(keyPrefix) || StringUtils.isNotBlank(startKey)) {
+      if (StringUtils.isBlank(keyPrefix) || StringUtils.isNotBlank(startKey) ||
+          OmUtils.isBucketSnapshotIndicator(keyPrefix)) {
         return;
       }
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 36f04ae566..caa61e8d69 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -57,6 +57,7 @@ import static org.apache.hadoop.hdds.HddsUtils.getHostName;
 import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
 import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_BIND_HOST_DEFAULT;
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DECOMMISSIONED_NODES_KEY;
@@ -812,4 +813,9 @@ public final class OmUtils {
     printString.append("]");
     return printString.toString();
   }
+
+  // Key points to entire bucket's snapshot
+  public static boolean isBucketSnapshotIndicator(String key) {
+    return key.startsWith(OM_SNAPSHOT_INDICATOR) && key.split("/").length == 2;
+  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/IOmMetadataReader.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/IOmMetadataReader.java
new file mode 100644
index 0000000000..8d6beb47b3
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/IOmMetadataReader.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Protocol for OmMetadataReader's.
+ */
+public interface IOmMetadataReader {
+  /**
+   * Look up for the container of an existing key.
+   *
+   * @param args the args of the key.
+   * @return OmKeyInfo instance that client uses to talk to container.
+   */
+  OmKeyInfo lookupKey(OmKeyArgs args) throws IOException;
+
+  /**
+   * List the status for a file or a directory and its contents.
+   *
+   * @param args    Key args
+   * @param recursive  For a directory if true all the descendants of a
+   *                   particular directory are listed
+   * @param startKey   Key from which listing needs to start. If startKey exists
+   *                   its status is included in the final list.
+   * @param numEntries Number of entries to list from the start key
+   * @param allowPartialPrefixes if partial prefixes should be allowed,
+   *                             this is needed in context of ListKeys
+   * @return list of file status
+   */
+  List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
+                                   String startKey, long numEntries,
+                                   boolean allowPartialPrefixes)
+      throws IOException;
+
+  default List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
+      String startKey, long numEntries)
+      throws IOException {
+    return listStatus(args, recursive, startKey, numEntries, false);
+  }
+
+  /**
+   * OzoneFS api to get file status for an entry.
+   *
+   * @param keyArgs Key args
+   * @throws OMException if file does not exist
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  OzoneFileStatus getFileStatus(OmKeyArgs keyArgs) throws IOException;
+  /**
+   * OzoneFS api to lookup for a file.
+   *
+   * @param args Key args
+   * @throws OMException if given key is not found or it is not a file
+   *                     if bucket does not exist
+   * @throws IOException if there is error in the db
+   *                     invalid arguments
+   */
+  OmKeyInfo lookupFile(OmKeyArgs args) throws IOException;
+
+  /**
+   * Returns a list of keys represented by {@link OmKeyInfo}
+   * in the given bucket. Argument volumeName, bucketName is required,
+   * others are optional.
+   *
+   * @param volumeName
+   *   the name of the volume.
+   * @param bucketName
+   *   the name of the bucket.
+   * @param startKey
+   *   the start key name, only the keys whose name is
+   *   after this value will be included in the result.
+   * @param keyPrefix
+   *   key name prefix, only the keys whose name has
+   *   this prefix will be included in the result.
+   * @param maxKeys
+   *   the maximum number of keys to return. It ensures
+   *   the size of the result will not exceed this limit.
+   * @return a list of keys.
+   */
+  List<OmKeyInfo> listKeys(String volumeName, String bucketName,
+                           String startKey, String keyPrefix, int maxKeys)
+      throws IOException;
+
+  /**
+   * Returns list of ACLs for given Ozone object.
+   *
+   * @param obj Ozone object.
+   * @throws IOException if there is error.
+   */
+  List<OzoneAcl> getAcl(OzoneObj obj) throws IOException;
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
index 23e72990cb..e4323c271b 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
@@ -414,7 +414,13 @@ public final class SnapshotInfo implements Auditable {
    * Get the table key for this snapshot.
    */
   public String getTableKey() {
-    return OM_KEY_PREFIX + snapshotPath + OM_KEY_PREFIX + name;
+    return getTableKey(volumeName, bucketName, name);
+  }
+
+  public static String getTableKey(String volumeName, String bucketName,
+      String snapshotName) {
+    return OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName
+        + OM_KEY_PREFIX + snapshotName;
   }
 
   /**
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
new file mode 100644
index 0000000000..5fc7f5e4ec
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/IOzoneManagerLock.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Interface for OM Metadata locks.
+ */
+public interface IOzoneManagerLock {
+  @Deprecated
+  boolean acquireLock(OzoneManagerLock.Resource resource, String... resources);
+
+  boolean acquireReadLock(OzoneManagerLock.Resource resource,
+                          String... resources);
+
+  boolean acquireReadHashedLock(OzoneManagerLock.Resource resource,
+                           String resourceName);
+
+  boolean acquireWriteLock(OzoneManagerLock.Resource resource,
+                           String... resources);
+
+  boolean acquireWriteHashedLock(OzoneManagerLock.Resource resource,
+                                 String resourceName);
+
+  String generateResourceName(OzoneManagerLock.Resource resource,
+                        String... resources);
+
+  boolean acquireMultiUserLock(String firstUser, String secondUser);
+
+  void releaseMultiUserLock(String firstUser, String secondUser);
+
+  void releaseWriteLock(OzoneManagerLock.Resource resource,
+                        String... resources);
+
+  void releaseWriteHashedLock(OzoneManagerLock.Resource resource,
+                        String resourceName);
+
+  void releaseReadLock(OzoneManagerLock.Resource resource, String... resources);
+
+  void releaseReadHashedLock(OzoneManagerLock.Resource resource,
+                        String resourceName);
+
+  @Deprecated
+  void releaseLock(OzoneManagerLock.Resource resource, String... resources);
+
+  @VisibleForTesting
+  int getReadHoldCount(String resourceName);
+
+  @VisibleForTesting
+  String getReadLockWaitingTimeMsStat();
+
+  @VisibleForTesting
+  long getLongestReadLockWaitingTimeMs();
+
+  @VisibleForTesting
+  String getReadLockHeldTimeMsStat();
+
+  @VisibleForTesting
+  long getLongestReadLockHeldTimeMs();
+
+  @VisibleForTesting
+  int getWriteHoldCount(String resourceName);
+
+  @VisibleForTesting
+  boolean isWriteLockedByCurrentThread(String resourceName);
+
+  @VisibleForTesting
+  String getWriteLockWaitingTimeMsStat();
+
+  @VisibleForTesting
+  long getLongestWriteLockWaitingTimeMs();
+
+  @VisibleForTesting
+  String getWriteLockHeldTimeMsStat();
+
+  @VisibleForTesting
+  long getLongestWriteLockHeldTimeMs();
+
+  void cleanup();
+
+  OMLockMetrics getOMLockMetrics();
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
new file mode 100644
index 0000000000..dc606bf13f
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OmReadOnlyLock.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.lock;
+
+/**
+ * Read only "lock" for snapshots
+ * Uses no lock.  Always returns true when aquiring
+ * read lock and false for write locks
+ */
+public class OmReadOnlyLock implements IOzoneManagerLock {
+  @Override
+  public boolean acquireLock(OzoneManagerLock.Resource resource,
+                             String... resources) {
+    return false;
+  }
+
+  @Override
+  public boolean acquireReadLock(OzoneManagerLock.Resource resource,
+                                 String... resources) {
+    return true;
+  }
+
+  @Override
+  public boolean acquireReadHashedLock(OzoneManagerLock.Resource resource,
+                                 String resourceName) {
+    return true;
+  }
+
+  @Override
+  public boolean acquireWriteLock(OzoneManagerLock.Resource resource,
+                                  String... resources) {
+    return false;
+  }
+
+  @Override
+  public boolean acquireWriteHashedLock(OzoneManagerLock.Resource resource,
+                                        String resourceName) {
+    return false;
+  }
+
+  @Override
+  public String generateResourceName(OzoneManagerLock.Resource resource,
+                                     String... resources) {
+    return "";
+  }
+
+  @Override
+  public boolean acquireMultiUserLock(String firstUser, String secondUser) {
+    return false;
+  }
+
+  @Override
+  public void releaseMultiUserLock(String firstUser, String secondUser) {
+  // Intentionally empty
+  }
+
+  @Override
+  public void releaseWriteLock(OzoneManagerLock.Resource resource,
+                               String... resources) {
+  // Intentionally empty
+  }
+
+  @Override
+  public void releaseWriteHashedLock(OzoneManagerLock.Resource resource,
+                               String resourceName) {
+  // Intentionally empty
+  }
+
+  @Override
+  public void releaseReadLock(OzoneManagerLock.Resource resource,
+                              String... resources) {
+  // Intentionally empty
+  }
+
+  @Override
+  public void releaseReadHashedLock(OzoneManagerLock.Resource resource,
+                               String resourceName) {
+  // Intentionally empty
+  }
+
+  @Override
+  public void releaseLock(OzoneManagerLock.Resource resource,
+                          String... resources) {
+  // Intentionally empty
+  }
+
+  @Override
+  public int getReadHoldCount(String resourceName) {
+    return 0;
+  }
+
+  @Override
+  public String getReadLockWaitingTimeMsStat() {
+    return "";
+  }
+
+  @Override
+  public long getLongestReadLockWaitingTimeMs() {
+    return 0;
+  }
+
+  @Override
+  public String getReadLockHeldTimeMsStat() {
+    return "";
+  }
+
+  @Override
+  public long getLongestReadLockHeldTimeMs() {
+    return 0;
+  }
+
+  @Override
+  public int getWriteHoldCount(String resourceName) {
+    return 0;
+  }
+
+  @Override
+  public boolean isWriteLockedByCurrentThread(String resourceName) {
+    return false;
+  }
+
+  @Override
+  public String getWriteLockWaitingTimeMsStat() {
+    return "";
+  }
+
+  @Override
+  public long getLongestWriteLockWaitingTimeMs() {
+    return 0;
+  }
+
+  @Override
+  public String getWriteLockHeldTimeMsStat() {
+    return "";
+  }
+
+  @Override
+  public long getLongestWriteLockHeldTimeMs() {
+    return 0;
+  }
+
+  @Override
+  public void cleanup() {
+  // Intentionally empty
+  }
+
+  @Override
+  public OMLockMetrics getOMLockMetrics() {
+    throw new UnsupportedOperationException(
+        "OmReadOnlyLock does not support this operation.");
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
index 353264ac49..b5cf45e043 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
@@ -77,7 +77,7 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_MANAGER_FAIR_LOCK;
  * <br>
  */
 
-public class OzoneManagerLock {
+public class OzoneManagerLock implements IOzoneManagerLock {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(OzoneManagerLock.class);
@@ -118,6 +118,7 @@ public class OzoneManagerLock {
    * should be bucket name. For remaining all resource only one param should
    * be passed.
    */
+  @Override
   @Deprecated
   public boolean acquireLock(Resource resource, String... resources) {
     String resourceName = generateResourceName(resource, resources);
@@ -141,11 +142,13 @@ public class OzoneManagerLock {
    * should be bucket name. For remaining all resource only one param should
    * be passed.
    */
+  @Override
   public boolean acquireReadLock(Resource resource, String... resources) {
     String resourceName = generateResourceName(resource, resources);
     return lock(resource, resourceName, manager::readLock, READ_LOCK);
   }
 
+  @Override
   public boolean acquireReadHashedLock(Resource resource, String resourceName) {
     return lock(resource, resourceName, manager::readLock, READ_LOCK);
   }
@@ -167,11 +170,13 @@ public class OzoneManagerLock {
    * should be bucket name. For remaining all resource only one param should
    * be passed.
    */
+  @Override
   public boolean acquireWriteLock(Resource resource, String... resources) {
     String resourceName = generateResourceName(resource, resources);
     return lock(resource, resourceName, manager::writeLock, WRITE_LOCK);
   }
 
+  @Override
   public boolean acquireWriteHashedLock(Resource resource,
                                         String resourceName) {
     return lock(resource, resourceName, manager::writeLock, WRITE_LOCK);
@@ -250,6 +255,7 @@ public class OzoneManagerLock {
    * @param resource
    * @param resources
    */
+  @Override
   public String generateResourceName(Resource resource, String... resources) {
     if (resources.length == 1 && resource != Resource.BUCKET_LOCK) {
       return OzoneManagerLockUtil.generateResourceLockName(resource,
@@ -293,6 +299,7 @@ public class OzoneManagerLock {
    * @param firstUser
    * @param secondUser
    */
+  @Override
   public boolean acquireMultiUserLock(String firstUser, String secondUser) {
     Resource resource = Resource.USER_LOCK;
     firstUser = generateResourceName(resource, firstUser);
@@ -357,6 +364,7 @@ public class OzoneManagerLock {
    * @param firstUser
    * @param secondUser
    */
+  @Override
   public void releaseMultiUserLock(String firstUser, String secondUser) {
     Resource resource = Resource.USER_LOCK;
     firstUser = generateResourceName(resource, firstUser);
@@ -394,11 +402,13 @@ public class OzoneManagerLock {
    * should be bucket name. For remaining all resource only one param should
    * be passed.
    */
+  @Override
   public void releaseWriteLock(Resource resource, String... resources) {
     String resourceName = generateResourceName(resource, resources);
     unlock(resource, resourceName, manager::writeUnlock, WRITE_LOCK);
   }
 
+  @Override
   public void releaseWriteHashedLock(Resource resource, String resourceName) {
     unlock(resource, resourceName, manager::writeUnlock, WRITE_LOCK);
   }
@@ -411,11 +421,13 @@ public class OzoneManagerLock {
    * should be bucket name. For remaining all resource only one param should
    * be passed.
    */
+  @Override
   public void releaseReadLock(Resource resource, String... resources) {
     String resourceName = generateResourceName(resource, resources);
     unlock(resource, resourceName, manager::readUnlock, READ_LOCK);
   }
 
+  @Override
   public void releaseReadHashedLock(Resource resource, String resourceName) {
     unlock(resource, resourceName, manager::readUnlock, READ_LOCK);
   }
@@ -428,6 +440,7 @@ public class OzoneManagerLock {
    * should be bucket name. For remaining all resource only one param should
    * be passed.
    */
+  @Override
   @Deprecated
   public void releaseLock(Resource resource, String... resources) {
     String resourceName = generateResourceName(resource, resources);
@@ -499,6 +512,7 @@ public class OzoneManagerLock {
    * @param resourceName resource lock name
    * @return readHoldCount
    */
+  @Override
   @VisibleForTesting
   public int getReadHoldCount(String resourceName) {
     return manager.getReadHoldCount(resourceName);
@@ -511,6 +525,7 @@ public class OzoneManagerLock {
    *
    * @return String representation of object
    */
+  @Override
   @VisibleForTesting
   public String getReadLockWaitingTimeMsStat() {
     return omLockMetrics.getReadLockWaitingTimeMsStat();
@@ -522,6 +537,7 @@ public class OzoneManagerLock {
    *
    * @return longest read lock waiting time (ms)
    */
+  @Override
   @VisibleForTesting
   public long getLongestReadLockWaitingTimeMs() {
     return omLockMetrics.getLongestReadLockWaitingTimeMs();
@@ -534,6 +550,7 @@ public class OzoneManagerLock {
    *
    * @return String representation of object
    */
+  @Override
   @VisibleForTesting
   public String getReadLockHeldTimeMsStat() {
     return omLockMetrics.getReadLockHeldTimeMsStat();
@@ -545,6 +562,7 @@ public class OzoneManagerLock {
    *
    * @return longest read lock held time (ms)
    */
+  @Override
   @VisibleForTesting
   public long getLongestReadLockHeldTimeMs() {
     return omLockMetrics.getLongestReadLockHeldTimeMs();
@@ -556,6 +574,7 @@ public class OzoneManagerLock {
    * @param resourceName resource lock name
    * @return writeHoldCount
    */
+  @Override
   @VisibleForTesting
   public int getWriteHoldCount(String resourceName) {
     return manager.getWriteHoldCount(resourceName);
@@ -569,6 +588,7 @@ public class OzoneManagerLock {
    * @return {@code true} if the current thread holds the write lock and
    *         {@code false} otherwise
    */
+  @Override
   @VisibleForTesting
   public boolean isWriteLockedByCurrentThread(String resourceName) {
     return manager.isWriteLockedByCurrentThread(resourceName);
@@ -581,6 +601,7 @@ public class OzoneManagerLock {
    *
    * @return String representation of object
    */
+  @Override
   @VisibleForTesting
   public String getWriteLockWaitingTimeMsStat() {
     return omLockMetrics.getWriteLockWaitingTimeMsStat();
@@ -592,6 +613,7 @@ public class OzoneManagerLock {
    *
    * @return longest write lock waiting time (ms)
    */
+  @Override
   @VisibleForTesting
   public long getLongestWriteLockWaitingTimeMs() {
     return omLockMetrics.getLongestWriteLockWaitingTimeMs();
@@ -604,6 +626,7 @@ public class OzoneManagerLock {
    *
    * @return String representation of object
    */
+  @Override
   @VisibleForTesting
   public String getWriteLockHeldTimeMsStat() {
     return omLockMetrics.getWriteLockHeldTimeMsStat();
@@ -615,6 +638,7 @@ public class OzoneManagerLock {
    *
    * @return longest write lock held time (ms)
    */
+  @Override
   @VisibleForTesting
   public long getLongestWriteLockHeldTimeMs() {
     return omLockMetrics.getLongestWriteLockHeldTimeMs();
@@ -623,10 +647,12 @@ public class OzoneManagerLock {
   /**
    * Unregisters OMLockMetrics source.
    */
+  @Override
   public void cleanup() {
     omLockMetrics.unRegister();
   }
 
+  @Override
   public OMLockMetrics getOMLockMetrics() {
     return omLockMetrics;
   }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 3e7a756c2a..47880d9035 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -24,6 +24,7 @@ import java.util.List;
 
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.om.IOmMetadataReader;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.DBUpdates;
@@ -32,7 +33,6 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmDeleteKeys;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
-import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
@@ -44,7 +44,6 @@ import org.apache.hadoop.ozone.om.helpers.OmRenameKeys;
 import org.apache.hadoop.ozone.om.helpers.OmTenantArgs;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
-import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.S3VolumeContext;
@@ -71,7 +70,7 @@ import org.apache.hadoop.security.token.TokenInfo;
     serverPrincipal = OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY)
 @TokenInfo(OzoneDelegationTokenSelector.class)
 public interface OzoneManagerProtocol
-    extends OzoneManagerSecurityProtocol, Closeable {
+    extends IOmMetadataReader, OzoneManagerSecurityProtocol, Closeable {
 
   @SuppressWarnings("checkstyle:ConstantName")
   /**
@@ -262,15 +261,6 @@ public interface OzoneManagerProtocol
         "this to be implemented, as write requests use a new approach.");
   }
 
-  /**
-   * Look up for the container of an existing key.
-   *
-   * @param args the args of the key.
-   * @return OmKeyInfo instance that client uses to talk to container.
-   * @throws IOException
-   */
-  OmKeyInfo lookupKey(OmKeyArgs args) throws IOException;
-
   /**
    * Rename an existing key within a bucket.
    * @param args the args of the key.
@@ -356,31 +346,6 @@ public interface OzoneManagerProtocol
       String startBucketName, String bucketPrefix, int maxNumOfBuckets)
       throws IOException;
 
-  /**
-   * Returns a list of keys represented by {@link OmKeyInfo}
-   * in the given bucket. Argument volumeName, bucketName is required,
-   * others are optional.
-   *
-   * @param volumeName
-   *   the name of the volume.
-   * @param bucketName
-   *   the name of the bucket.
-   * @param startKeyName
-   *   the start key name, only the keys whose name is
-   *   after this value will be included in the result.
-   * @param keyPrefix
-   *   key name prefix, only the keys whose name has
-   *   this prefix will be included in the result.
-   * @param maxKeys
-   *   the maximum number of keys to return. It ensures
-   *   the size of the result will not exceed this limit.
-   * @return a list of keys.
-   * @throws IOException
-   */
-  List<OmKeyInfo> listKeys(String volumeName,
-      String bucketName, String startKeyName, String keyPrefix, int maxKeys)
-      throws IOException;
-
   /**
    * Returns list of Ozone services with its configuration details.
    *
@@ -701,17 +666,6 @@ public interface OzoneManagerProtocol
    */
   TenantStateList listTenant() throws IOException;
 
-  /**
-   * OzoneFS api to get file status for an entry.
-   *
-   * @param keyArgs Key args
-   * @throws OMException if file does not exist
-   *                     if bucket does not exist
-   * @throws IOException if there is error in the db
-   *                     invalid arguments
-   */
-  OzoneFileStatus getFileStatus(OmKeyArgs keyArgs) throws IOException;
-
   /**
    * Ozone FS api to create a directory. Parent directories if do not exist
    * are created for the input directory.
@@ -749,49 +703,6 @@ public interface OzoneManagerProtocol
   }
 
 
-  /**
-   * OzoneFS api to lookup for a file.
-   *
-   * @param keyArgs Key args
-   * @throws OMException if given key is not found or it is not a file
-   *                     if bucket does not exist
-   * @throws IOException if there is error in the db
-   *                     invalid arguments
-   */
-  OmKeyInfo lookupFile(OmKeyArgs keyArgs) throws IOException;
-
-  /**
-   * List the status for a file or a directory and its contents.
-   *
-   * @param keyArgs    Key args
-   * @param recursive  For a directory if true all the descendants of a
-   *                   particular directory are listed
-   * @param startKey   Key from which listing needs to start. If startKey exists
-   *                   its status is included in the final list.
-   * @param numEntries Number of entries to list from the start key
-   * @return list of file status
-   */
-  List<OzoneFileStatus> listStatus(OmKeyArgs keyArgs, boolean recursive,
-      String startKey, long numEntries) throws IOException;
-
-  /**
-   * List the status for a file or a directory and its contents.
-   *
-   * @param keyArgs    Key args
-   * @param recursive  For a directory if true all the descendants of a
-   *                   particular directory are listed
-   * @param startKey   Key from which listing needs to start. If startKey exists
-   *                   its status is included in the final list.
-   * @param numEntries Number of entries to list from the start key
-   * @param allowPartialPrefixes if partial prefixes should be allowed,
-   *                             this is needed in context of ListKeys
-   * @return list of file status
-   */
-  List<OzoneFileStatus> listStatus(OmKeyArgs keyArgs, boolean recursive,
-                                   String startKey, long numEntries,
-                                   boolean allowPartialPrefixes)
-      throws IOException;
-
   /**
    * Add acl for Ozone object. Return true if acl is added successfully else
    * false.
@@ -833,15 +744,6 @@ public interface OzoneManagerProtocol
         "this to be implemented, as write requests use a new approach.");
   }
 
-
-  /**
-   * Returns list of ACLs for given Ozone object.
-   * @param obj Ozone object.
-   *
-   * @throws IOException if there is error.
-   * */
-  List<OzoneAcl> getAcl(OzoneObj obj) throws IOException;
-
   /**
    * Get DB updates since a specific sequence number.
    * @param dbUpdatesRequest request that encapsulates a sequence number.
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
new file mode 100644
index 0000000000..3f5b49ed99
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshot.java
@@ -0,0 +1,420 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.nio.charset.StandardCharsets;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.client.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.client.ReplicationType.RATIS;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+import static org.apache.hadoop.ozone.om.helpers.BucketLayout.FILE_SYSTEM_OPTIMIZED;
+import static org.apache.hadoop.ozone.om.helpers.BucketLayout.OBJECT_STORE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+
+/**
+ * Test OmSnapshot bucket interface.
+ */
+@RunWith(Parameterized.class)
+public class TestOmSnapshot {
+  private static MiniOzoneCluster cluster = null;
+  private static String volumeName;
+  private static String bucketName;
+  private static OzoneManagerProtocol writeClient;
+  private static BucketLayout bucketLayout = BucketLayout.LEGACY;
+  private static boolean enabledFileSystemPaths;
+  private static ObjectStore store;
+  private static File metaDir;
+  private static OzoneManager ozoneManager;
+
+  @Rule
+  public Timeout timeout = new Timeout(180, TimeUnit.SECONDS);
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(
+                         new Object[]{OBJECT_STORE, false},
+                         new Object[]{FILE_SYSTEM_OPTIMIZED, false},
+                         new Object[]{BucketLayout.LEGACY, true});
+  }
+
+  public TestOmSnapshot(BucketLayout newBucketLayout,
+      boolean newEnableFileSystemPaths) throws Exception {
+    // Checking whether 'newBucketLayout' and
+    // 'newEnableFileSystemPaths' flags represents next parameter
+    // index values. This is to ensure that initialize init() function
+    // will be invoked only at the beginning of every new set of
+    // Parameterized.Parameters.
+    if (TestOmSnapshot.enabledFileSystemPaths != newEnableFileSystemPaths ||
+            TestOmSnapshot.bucketLayout != newBucketLayout) {
+      setConfig(newBucketLayout, newEnableFileSystemPaths);
+      tearDown();
+      init();
+    }
+  }
+
+  private static void setConfig(BucketLayout newBucketLayout,
+      boolean newEnableFileSystemPaths) {
+    TestOmSnapshot.enabledFileSystemPaths = newEnableFileSystemPaths;
+    TestOmSnapshot.bucketLayout = newBucketLayout;
+  }
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   * <p>
+   *
+   */
+  private void init() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    String clusterId = UUID.randomUUID().toString();
+    String scmId = UUID.randomUUID().toString();
+    String omId = UUID.randomUUID().toString();
+    conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
+        enabledFileSystemPaths);
+    conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
+        bucketLayout.name());
+    cluster = MiniOzoneCluster.newBuilder(conf).setClusterId(clusterId)
+      .setScmId(scmId).setOmId(omId).build();
+    cluster.waitForClusterToBeReady();
+    // create a volume and a bucket to be used by OzoneFileSystem
+    OzoneBucket bucket = TestDataUtil
+        .createVolumeAndBucket(cluster, bucketLayout);
+    volumeName = bucket.getVolumeName();
+    bucketName = bucket.getName();
+
+    OzoneClient client = cluster.getClient();
+    store = client.getObjectStore();
+    writeClient = store.getClientProxy().getOzoneManagerClient();
+    ozoneManager = cluster.getOzoneManager();
+    KeyManagerImpl keyManager = (KeyManagerImpl) HddsWhiteboxTestUtils
+        .getInternalState(ozoneManager, "keyManager");
+
+    // stop the deletion services so that keys can still be read
+    keyManager.stop();
+    metaDir = OMStorage.getOmDbDir(conf);
+
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  // based on TestOzoneRpcClientAbstract:testListKey
+  public void testListKey()
+      throws IOException, InterruptedException, TimeoutException {
+    String volumeA = "vol-a-" + RandomStringUtils.randomNumeric(5);
+    String volumeB = "vol-b-" + RandomStringUtils.randomNumeric(5);
+    String bucketA = "buc-a-" + RandomStringUtils.randomNumeric(5);
+    String bucketB = "buc-b-" + RandomStringUtils.randomNumeric(5);
+    store.createVolume(volumeA);
+    store.createVolume(volumeB);
+    OzoneVolume volA = store.getVolume(volumeA);
+    OzoneVolume volB = store.getVolume(volumeB);
+    volA.createBucket(bucketA);
+    volA.createBucket(bucketB);
+    volB.createBucket(bucketA);
+    volB.createBucket(bucketB);
+    OzoneBucket volAbucketA = volA.getBucket(bucketA);
+    OzoneBucket volAbucketB = volA.getBucket(bucketB);
+    OzoneBucket volBbucketA = volB.getBucket(bucketA);
+    OzoneBucket volBbucketB = volB.getBucket(bucketB);
+
+    /*
+    Create 10 keys in  vol-a-<random>/buc-a-<random>,
+    vol-a-<random>/buc-b-<random>, vol-b-<random>/buc-a-<random> and
+    vol-b-<random>/buc-b-<random>
+     */
+    String keyBaseA = "key-a-";
+    for (int i = 0; i < 10; i++) {
+      byte[] value = RandomStringUtils.randomAscii(10240).getBytes(UTF_8);
+      OzoneOutputStream one = volAbucketA.createKey(
+          keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
+          value.length, RATIS, ONE,
+          new HashMap<>());
+      one.write(value);
+      one.close();
+      OzoneOutputStream two = volAbucketB.createKey(
+          keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
+          value.length, RATIS, ONE,
+          new HashMap<>());
+      two.write(value);
+      two.close();
+      OzoneOutputStream three = volBbucketA.createKey(
+          keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
+          value.length, RATIS, ONE,
+          new HashMap<>());
+      three.write(value);
+      three.close();
+      OzoneOutputStream four = volBbucketB.createKey(
+          keyBaseA + i + "-" + RandomStringUtils.randomNumeric(5),
+          value.length, RATIS, ONE,
+          new HashMap<>());
+      four.write(value);
+      four.close();
+    }
+    /*
+    Create 10 keys in  vol-a-<random>/buc-a-<random>,
+    vol-a-<random>/buc-b-<random>, vol-b-<random>/buc-a-<random> and
+    vol-b-<random>/buc-b-<random>
+     */
+    String keyBaseB = "key-b-";
+    for (int i = 0; i < 10; i++) {
+      byte[] value = RandomStringUtils.randomAscii(10240).getBytes(UTF_8);
+      OzoneOutputStream one = volAbucketA.createKey(
+          keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
+          value.length, RATIS, ONE,
+          new HashMap<>());
+      one.write(value);
+      one.close();
+      OzoneOutputStream two = volAbucketB.createKey(
+          keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
+          value.length, RATIS, ONE,
+          new HashMap<>());
+      two.write(value);
+      two.close();
+      OzoneOutputStream three = volBbucketA.createKey(
+          keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
+          value.length, RATIS, ONE,
+          new HashMap<>());
+      three.write(value);
+      three.close();
+      OzoneOutputStream four = volBbucketB.createKey(
+          keyBaseB + i + "-" + RandomStringUtils.randomNumeric(5),
+          value.length, RATIS, ONE,
+          new HashMap<>());
+      four.write(value);
+      four.close();
+    }
+
+
+    String snapshotKeyPrefix = createSnapshot(volumeA, bucketA);
+    Iterator<? extends OzoneKey> volABucketAIter =
+        volAbucketA.listKeys(snapshotKeyPrefix + "key-");
+    int volABucketAKeyCount = 0;
+    while (volABucketAIter.hasNext()) {
+      volABucketAIter.next();
+      volABucketAKeyCount++;
+    }
+    Assert.assertEquals(20, volABucketAKeyCount);
+
+    snapshotKeyPrefix = createSnapshot(volumeA, bucketB);
+    deleteKeys(volAbucketB);
+    Iterator<? extends OzoneKey> volABucketBIter =
+        volAbucketB.listKeys(snapshotKeyPrefix + "key-");
+    int volABucketBKeyCount = 0;
+    while (volABucketBIter.hasNext()) {
+      volABucketBIter.next();
+      volABucketBKeyCount++;
+    }
+    Assert.assertEquals(20, volABucketBKeyCount);
+
+    snapshotKeyPrefix = createSnapshot(volumeB, bucketA);
+    deleteKeys(volBbucketA);
+    Iterator<? extends OzoneKey> volBBucketAIter =
+        volBbucketA.listKeys(snapshotKeyPrefix + "key-");
+    int volBBucketAKeyCount = 0;
+    while (volBBucketAIter.hasNext()) {
+      volBBucketAIter.next();
+      volBBucketAKeyCount++;
+    }
+    Assert.assertEquals(20, volBBucketAKeyCount);
+
+    snapshotKeyPrefix = createSnapshot(volumeB, bucketB);
+    deleteKeys(volBbucketB);
+    Iterator<? extends OzoneKey> volBBucketBIter =
+        volBbucketB.listKeys(snapshotKeyPrefix + "key-");
+    int volBBucketBKeyCount = 0;
+    while (volBBucketBIter.hasNext()) {
+      volBBucketBIter.next();
+      volBBucketBKeyCount++;
+    }
+    Assert.assertEquals(20, volBBucketBKeyCount);
+
+    snapshotKeyPrefix = createSnapshot(volumeA, bucketA);
+    deleteKeys(volAbucketA);
+    Iterator<? extends OzoneKey> volABucketAKeyAIter =
+        volAbucketA.listKeys(snapshotKeyPrefix + "key-a-");
+    int volABucketAKeyACount = 0;
+    while (volABucketAKeyAIter.hasNext()) {
+      volABucketAKeyAIter.next();
+      volABucketAKeyACount++;
+    }
+    Assert.assertEquals(10, volABucketAKeyACount);
+    Iterator<? extends OzoneKey> volABucketAKeyBIter =
+        volAbucketA.listKeys(snapshotKeyPrefix + "key-b-");
+    for (int i = 0; i < 10; i++) {
+      Assert.assertTrue(volABucketAKeyBIter.next().getName()
+          .startsWith(snapshotKeyPrefix + "key-b-" + i + "-"));
+    }
+    Assert.assertFalse(volABucketBIter.hasNext());
+  }
+
+  @Test
+  // based on TestOzoneRpcClientAbstract:testListKeyOnEmptyBucket
+  public void testListKeyOnEmptyBucket()
+      throws IOException, InterruptedException, TimeoutException {
+    String volume = "vol-" + RandomStringUtils.randomNumeric(5);
+    String bucket = "buc-" + RandomStringUtils.randomNumeric(5);
+    store.createVolume(volume);
+    OzoneVolume vol = store.getVolume(volume);
+    vol.createBucket(bucket);
+    String snapshotKeyPrefix = createSnapshot(volume, bucket);
+    OzoneBucket buc = vol.getBucket(bucket);
+    Iterator<? extends OzoneKey> keys = buc.listKeys(snapshotKeyPrefix);
+    while (keys.hasNext()) {
+      fail();
+    }
+  }
+
+  private OmKeyArgs genKeyArgs(String keyName) {
+    return new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setAcls(Collections.emptyList())
+        .setReplicationConfig(StandaloneReplicationConfig.getInstance(
+            HddsProtos.ReplicationFactor.ONE))
+        .setLocationInfoList(new ArrayList<>())
+        .build();
+  }
+
+  @Test
+  public void checkKey() throws Exception {
+    OzoneVolume ozoneVolume = store.getVolume(volumeName);
+    assertTrue(ozoneVolume.getName().equals(volumeName));
+    OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucketName);
+    assertTrue(ozoneBucket.getName().equals(bucketName));
+
+    String s = "testData";
+    String dir1 = "dir1";
+    String key1 = dir1 + "/key1";
+
+    // create key1
+    OzoneOutputStream ozoneOutputStream =
+            ozoneBucket.createKey(key1, s.length());
+    byte[] input = s.getBytes(StandardCharsets.UTF_8);
+    ozoneOutputStream.write(input);
+    ozoneOutputStream.close();
+
+    String snapshotKeyPrefix = createSnapshot();
+    ozoneBucket.deleteKey(key1);
+    try {
+      ozoneBucket.deleteKey(dir1);
+    } catch (OMException e) {
+      // OBJECT_STORE won't have directory entry so ignore KEY_NOT_FOUND
+      if (e.getResult() != KEY_NOT_FOUND) {
+        fail("got exception on cleanup: " + e.getMessage());
+      }
+    }
+    OmKeyArgs keyArgs = genKeyArgs(snapshotKeyPrefix + key1);
+    
+    OmKeyInfo omKeyInfo = writeClient.lookupKey(keyArgs);
+    assertEquals(omKeyInfo.getKeyName(), snapshotKeyPrefix + key1);
+    
+    OmKeyInfo fileInfo = writeClient.lookupFile(keyArgs);
+    assertEquals(fileInfo.getKeyName(), snapshotKeyPrefix + key1);
+
+    OzoneFileStatus ozoneFileStatus = writeClient.getFileStatus(keyArgs);
+    assertEquals(ozoneFileStatus.getKeyInfo().getKeyName(),
+        snapshotKeyPrefix + key1);
+  }
+
+  private String createSnapshot()
+      throws IOException, InterruptedException, TimeoutException {
+    return createSnapshot(volumeName, bucketName);
+  }
+  private String createSnapshot(String vname, String bname)
+      throws IOException, InterruptedException, TimeoutException {
+    String snapshotName = UUID.randomUUID().toString();
+    writeClient = store.getClientProxy().getOzoneManagerClient();
+    writeClient.createSnapshot(vname, bname, snapshotName);
+    String snapshotKeyPrefix = OmSnapshotManager
+        .getSnapshotPrefix(snapshotName);
+    SnapshotInfo snapshotInfo = ozoneManager
+        .getMetadataManager().getSnapshotInfoTable()
+        .get(SnapshotInfo.getTableKey(vname, bname, snapshotName));
+    String snapshotDirName = metaDir + OM_KEY_PREFIX +
+        OM_SNAPSHOT_DIR + OM_KEY_PREFIX + OM_DB_NAME +
+        snapshotInfo.getCheckpointDirName() + OM_KEY_PREFIX + "CURRENT";
+    GenericTestUtils.waitFor(() -> new File(snapshotDirName).exists(),
+        1000, 120000);
+
+    return snapshotKeyPrefix;
+
+  }
+
+
+  private void deleteKeys(OzoneBucket bucket) throws IOException {
+    Iterator<? extends OzoneKey> bucketIter =
+        bucket.listKeys(null);
+    while (bucketIter.hasNext()) {
+      OzoneKey key = bucketIter.next();
+      bucket.deleteKey(key.getName());
+    }
+  }
+}
+
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotFileSystem.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotFileSystem.java
new file mode 100644
index 0000000000..36424e4a39
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotFileSystem.java
@@ -0,0 +1,581 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.ozone.OzoneFileSystem;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.ObjectStore;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.client.OzoneKey;
+import org.apache.hadoop.ozone.client.OzoneVolume;
+import org.apache.hadoop.ozone.client.io.OzoneInputStream;
+import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.fs.ozone.Constants.LISTING_PAGE_SIZE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_ITERATE_BATCH_SIZE;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * OmSnapshot file system tests.
+ */
+@RunWith(Parameterized.class)
+public class TestOmSnapshotFileSystem {
+  private static MiniOzoneCluster cluster = null;
+  private static OzoneConfiguration conf;
+  private static String volumeName;
+  private static String bucketName;
+  private static FileSystem fs;
+  private static OzoneFileSystem o3fs;
+  private static OzoneManagerProtocol writeClient;
+  private static BucketLayout bucketLayout;
+  private static boolean enabledFileSystemPaths;
+  private static File metaDir;
+  private static OzoneManager ozoneManager;
+  private static String keyPrefix;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestOmSnapshot.class);
+
+
+
+  @Rule
+  public Timeout timeout = new Timeout(120, TimeUnit.SECONDS);
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(
+        new Object[]{BucketLayout.FILE_SYSTEM_OPTIMIZED, false},
+        new Object[]{BucketLayout.LEGACY, true});
+  }
+
+  public TestOmSnapshotFileSystem(BucketLayout newBucketLayout,
+      boolean newEnableFileSystemPaths) throws Exception {
+    // Checking whether 'newBucketLayout' and
+    // 'newEnableFileSystemPaths' flags represents next parameter
+    // index values. This is to ensure that initialize init() function
+    // will be invoked only at the beginning of every new set of
+    // Parameterized.Parameters.
+    if (TestOmSnapshotFileSystem.enabledFileSystemPaths !=
+        newEnableFileSystemPaths ||
+        TestOmSnapshotFileSystem.bucketLayout != newBucketLayout) {
+      setConfig(newBucketLayout, newEnableFileSystemPaths);
+      tearDown();
+      init();
+    }
+  }
+  private static void setConfig(BucketLayout newBucketLayout,
+      boolean newEnableFileSystemPaths) {
+    TestOmSnapshotFileSystem.enabledFileSystemPaths = newEnableFileSystemPaths;
+    TestOmSnapshotFileSystem.bucketLayout = newBucketLayout;
+  }
+  /**
+   * Create a MiniDFSCluster for testing.
+   */
+  private void init() throws Exception {
+    conf = new OzoneConfiguration();
+    String clusterId = UUID.randomUUID().toString();
+    String scmId = UUID.randomUUID().toString();
+    String omId = UUID.randomUUID().toString();
+    conf.setBoolean(OMConfigKeys.OZONE_OM_ENABLE_FILESYSTEM_PATHS,
+        enabledFileSystemPaths);
+    conf.set(OMConfigKeys.OZONE_DEFAULT_BUCKET_LAYOUT,
+        bucketLayout.name());
+    cluster = MiniOzoneCluster.newBuilder(conf).setClusterId(clusterId)
+        .setScmId(scmId).setOmId(omId).build();
+    cluster.waitForClusterToBeReady();
+    // create a volume and a bucket to be used by OzoneFileSystem
+    OzoneBucket bucket = TestDataUtil
+        .createVolumeAndBucket(cluster, bucketLayout);
+    volumeName = bucket.getVolumeName();
+    bucketName = bucket.getName();
+
+    String rootPath = String
+        .format("%s://%s.%s/", OzoneConsts.OZONE_URI_SCHEME, bucket.getName(),
+        bucket.getVolumeName());
+    // Set the fs.defaultFS and start the filesystem
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+    // Set the number of keys to be processed during batch operate.
+    conf.setInt(OZONE_FS_ITERATE_BATCH_SIZE, 5);
+    fs = FileSystem.get(conf);
+    o3fs = (OzoneFileSystem) fs;
+
+    OzoneClient client = cluster.getClient();
+    ObjectStore objectStore = client.getObjectStore();
+    writeClient = objectStore.getClientProxy().getOzoneManagerClient();
+    ozoneManager = cluster.getOzoneManager();
+    metaDir = OMStorage.getOmDbDir(conf);
+
+    // stop the deletion services so that keys can still be read
+    KeyManagerImpl keyManager = (KeyManagerImpl) ozoneManager.getKeyManager();
+    keyManager.stop();
+  }
+
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+    IOUtils.closeQuietly(fs);
+  }
+
+  @Test
+  // based on TestObjectStoreWithFSO:testListKeysAtDifferentLevels
+  public void testListKeysAtDifferentLevels() throws Exception {
+    OzoneClient client = cluster.getClient();
+
+    ObjectStore objectStore = client.getObjectStore();
+    OzoneVolume ozoneVolume = objectStore.getVolume(volumeName);
+    Assert.assertTrue(ozoneVolume.getName().equals(volumeName));
+    OzoneBucket ozoneBucket = ozoneVolume.getBucket(bucketName);
+    Assert.assertTrue(ozoneBucket.getName().equals(bucketName));
+
+    String keyc1 = "/a/b1/c1/c1.tx";
+    String keyc2 = "/a/b1/c2/c2.tx";
+
+    String keyd13 = "/a/b2/d1/d11.tx";
+    String keyd21 = "/a/b2/d2/d21.tx";
+    String keyd22 = "/a/b2/d2/d22.tx";
+    String keyd31 = "/a/b2/d3/d31.tx";
+
+    String keye11 = "/a/b3/e1/e11.tx";
+    String keye21 = "/a/b3/e2/e21.tx";
+    String keye31 = "/a/b3/e3/e31.tx";
+
+    LinkedList<String> keys = new LinkedList<>();
+    keys.add(keyc1);
+    keys.add(keyc2);
+
+    keys.add(keyd13);
+    keys.add(keyd21);
+    keys.add(keyd22);
+    keys.add(keyd31);
+
+    keys.add(keye11);
+    keys.add(keye21);
+    keys.add(keye31);
+
+    int length = 10;
+    byte[] input = new byte[length];
+    Arrays.fill(input, (byte)96);
+
+    createKeys(ozoneBucket, keys);
+
+
+    setKeyPrefix(createSnapshot().substring(1));
+   
+    // Delete the active fs so that we don't inadvertently read it
+    deleteRootDir();
+    // Root level listing keys
+    Iterator<? extends OzoneKey> ozoneKeyIterator =
+        ozoneBucket.listKeys(keyPrefix, null);
+    verifyFullTreeStructure(ozoneKeyIterator);
+
+    ozoneKeyIterator =
+        ozoneBucket.listKeys(keyPrefix + "a/", null);
+    verifyFullTreeStructure(ozoneKeyIterator);
+
+    LinkedList<String> expectedKeys;
+
+    // Intermediate level keyPrefix - 2nd level
+    ozoneKeyIterator =
+        ozoneBucket.listKeys(keyPrefix + "a///b2///", null);
+    expectedKeys = new LinkedList<>();
+    expectedKeys.add("a/b2/");
+    expectedKeys.add("a/b2/d1/");
+    expectedKeys.add("a/b2/d1/d11.tx");
+    expectedKeys.add("a/b2/d2/");
+    expectedKeys.add("a/b2/d2/d21.tx");
+    expectedKeys.add("a/b2/d2/d22.tx");
+    expectedKeys.add("a/b2/d3/");
+    expectedKeys.add("a/b2/d3/d31.tx");
+    checkKeyList(ozoneKeyIterator, expectedKeys);
+
+    // Intermediate level keyPrefix - 3rd level
+    ozoneKeyIterator =
+        ozoneBucket.listKeys(keyPrefix + "a/b2/d1", null);
+    expectedKeys = new LinkedList<>();
+    expectedKeys.add("a/b2/d1/");
+    expectedKeys.add("a/b2/d1/d11.tx");
+    checkKeyList(ozoneKeyIterator, expectedKeys);
+
+    // Boundary of a level
+    ozoneKeyIterator =
+      ozoneBucket.listKeys(keyPrefix + "a/b2/d2", keyPrefix + "a/b2/d2/d21.tx");
+    expectedKeys = new LinkedList<>();
+    expectedKeys.add("a/b2/d2/d22.tx");
+    checkKeyList(ozoneKeyIterator, expectedKeys);
+
+    // Boundary case - last node in the depth-first-traversal
+    ozoneKeyIterator =
+      ozoneBucket.listKeys(keyPrefix + "a/b3/e3", keyPrefix + "a/b3/e3/e31.tx");
+    expectedKeys = new LinkedList<>();
+    checkKeyList(ozoneKeyIterator, expectedKeys);
+  }
+
+  private void verifyFullTreeStructure(Iterator<? extends OzoneKey> keyItr) {
+    LinkedList<String> expectedKeys = new LinkedList<>();
+    expectedKeys.add("a/");
+    expectedKeys.add("a/b1/");
+    expectedKeys.add("a/b1/c1/");
+    expectedKeys.add("a/b1/c1/c1.tx");
+    expectedKeys.add("a/b1/c2/");
+    expectedKeys.add("a/b1/c2/c2.tx");
+    expectedKeys.add("a/b2/");
+    expectedKeys.add("a/b2/d1/");
+    expectedKeys.add("a/b2/d1/d11.tx");
+    expectedKeys.add("a/b2/d2/");
+    expectedKeys.add("a/b2/d2/d21.tx");
+    expectedKeys.add("a/b2/d2/d22.tx");
+    expectedKeys.add("a/b2/d3/");
+    expectedKeys.add("a/b2/d3/d31.tx");
+    expectedKeys.add("a/b3/");
+    expectedKeys.add("a/b3/e1/");
+    expectedKeys.add("a/b3/e1/e11.tx");
+    expectedKeys.add("a/b3/e2/");
+    expectedKeys.add("a/b3/e2/e21.tx");
+    expectedKeys.add("a/b3/e3/");
+    expectedKeys.add("a/b3/e3/e31.tx");
+    checkKeyList(keyItr, expectedKeys);
+  }
+
+  private void checkKeyList(Iterator<? extends OzoneKey > ozoneKeyIterator,
+      List<String> keys) {
+
+    LinkedList<String> outputKeys = new LinkedList<>();
+    while (ozoneKeyIterator.hasNext()) {
+      OzoneKey ozoneKey = ozoneKeyIterator.next();
+      String keyName = ozoneKey.getName();
+      if (keyName.startsWith(keyPrefix)) {
+        keyName = keyName.substring(keyPrefix.length());
+      }
+      outputKeys.add(keyName);
+    }
+    Assert.assertEquals(keys, outputKeys);
+  }
+
+  private void createKeys(OzoneBucket ozoneBucket, List<String> keys)
+      throws Exception {
+    int length = 10;
+    byte[] input = new byte[length];
+    Arrays.fill(input, (byte) 96);
+    for (String key : keys) {
+      createKey(ozoneBucket, key, 10, input);
+    }
+  }
+
+  private void createKey(OzoneBucket ozoneBucket, String key, int length,
+      byte[] input) throws Exception {
+
+    OzoneOutputStream ozoneOutputStream =
+            ozoneBucket.createKey(key, length);
+
+    ozoneOutputStream.write(input);
+    ozoneOutputStream.write(input, 0, 10);
+    ozoneOutputStream.close();
+
+    // Read the key with given key name.
+    OzoneInputStream ozoneInputStream = ozoneBucket.readKey(key);
+    byte[] read = new byte[length];
+    ozoneInputStream.read(read, 0, length);
+    ozoneInputStream.close();
+
+    String inputString = new String(input, StandardCharsets.UTF_8);
+    Assert.assertEquals(inputString, new String(read, StandardCharsets.UTF_8));
+
+    // Read using filesystem.
+    String rootPath = String.format("%s://%s.%s/", OZONE_URI_SCHEME,
+            bucketName, volumeName, StandardCharsets.UTF_8);
+    OzoneFileSystem o3fsNew = (OzoneFileSystem) FileSystem
+        .get(new URI(rootPath), conf);
+    FSDataInputStream fsDataInputStream = o3fsNew.open(new Path(key));
+    read = new byte[length];
+    fsDataInputStream.read(read, 0, length);
+    ozoneInputStream.close();
+
+    Assert.assertEquals(inputString, new String(read, StandardCharsets.UTF_8));
+  }
+
+  private static void setKeyPrefix(String s) {
+    keyPrefix = s;
+  }
+
+  @Test
+  // based on TestOzoneFileSystem:testListStatus
+  public void testListStatus() throws Exception {
+    Path root = new Path("/");
+    Path parent = new Path(root, "/testListStatus");
+    Path file1 = new Path(parent, "key1");
+    Path file2 = new Path(parent, "key2");
+
+    String snapshotKeyPrefix = createSnapshot();
+    Path snapshotRoot = new Path(snapshotKeyPrefix + root);
+    FileStatus[] fileStatuses = o3fs.listStatus(snapshotRoot);
+    Assert.assertEquals("Should be empty", 0, fileStatuses.length);
+
+    ContractTestUtils.touch(fs, file1);
+    ContractTestUtils.touch(fs, file2);
+
+    snapshotKeyPrefix = createSnapshot();
+    snapshotRoot = new Path(snapshotKeyPrefix + root);
+    Path snapshotParent = new Path(snapshotKeyPrefix + parent);
+    fileStatuses = o3fs.listStatus(snapshotRoot);
+    Assert.assertEquals("Should have created parent",
+            1, fileStatuses.length);
+    Assert.assertEquals("Parent path doesn't match",
+            fileStatuses[0].getPath().toUri().getPath(),
+            snapshotParent.toString());
+
+    // ListStatus on a directory should return all subdirs along with
+    // files, even if there exists a file and sub-dir with the same name.
+    fileStatuses = o3fs.listStatus(snapshotParent);
+    assertEquals("FileStatus did not return all children of the directory",
+        2, fileStatuses.length);
+
+    // ListStatus should return only the immediate children of a directory.
+    Path file3 = new Path(parent, "dir1/key3");
+    Path file4 = new Path(parent, "dir1/key4");
+    ContractTestUtils.touch(fs, file3);
+    ContractTestUtils.touch(fs, file4);
+    snapshotKeyPrefix = createSnapshot();
+    snapshotParent = new Path(snapshotKeyPrefix + parent);
+    deleteRootDir();
+    fileStatuses = o3fs.listStatus(snapshotParent);
+    assertEquals("FileStatus did not return all children of the directory",
+        3, fileStatuses.length);
+  }
+
+  @Test
+  // based on TestOzoneFileSystem:testListStatusWithIntermediateDir
+  public void testListStatusWithIntermediateDir() throws Exception {
+    String keyName = "object-dir/object-name";
+    OmKeyArgs keyArgs = new OmKeyArgs.Builder()
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setKeyName(keyName)
+        .setAcls(Collections.emptyList())
+        .setReplicationConfig(StandaloneReplicationConfig.getInstance(ONE))
+        .setLocationInfoList(new ArrayList<>())
+        .build();
+
+    OpenKeySession session = writeClient.openKey(keyArgs);
+    writeClient.commitKey(keyArgs, session.getId());
+
+    Path parent = new Path("/");
+
+    // Wait until the filestatus is updated
+    if (!enabledFileSystemPaths) {
+      GenericTestUtils.waitFor(() -> {
+        try {
+          return fs.listStatus(parent).length != 0;
+        } catch (IOException e) {
+          LOG.error("listStatus() Failed", e);
+          Assert.fail("listStatus() Failed");
+          return false;
+        }
+      }, 1000, 120000);
+    }
+
+    String snapshotKeyPrefix = createSnapshot();
+    deleteRootDir();
+    Path snapshotParent = new Path(snapshotKeyPrefix + parent);
+    FileStatus[] fileStatuses = fs.listStatus(snapshotParent);
+
+    // the number of immediate children of root is 1
+    Assert.assertEquals(1, fileStatuses.length);
+  }
+
+  /**
+   * Tests listStatus operation on root directory.
+   */
+  @Test
+  // based on TestOzoneFileSystem:testListStatusOnRoot
+  public void testListStatusOnRoot() throws Exception {
+    Path root = new Path("/");
+    Path dir1 = new Path(root, "dir1");
+    Path dir12 = new Path(dir1, "dir12");
+    Path dir2 = new Path(root, "dir2");
+    fs.mkdirs(dir12);
+    fs.mkdirs(dir2);
+
+    // ListStatus on root should return dir1 (even though /dir1 key does not
+    // exist) and dir2 only. dir12 is not an immediate child of root and
+    // hence should not be listed.
+    String snapshotKeyPrefix = createSnapshot();
+    deleteRootDir();
+    Path snapshotRoot = new Path(snapshotKeyPrefix + root);
+    FileStatus[] fileStatuses = o3fs.listStatus(snapshotRoot);
+    assertEquals("FileStatus should return only the immediate children",
+        2, fileStatuses.length);
+
+    // Verify that dir12 is not included in the result of the listStatus on root
+    String fileStatus1 = fileStatuses[0].getPath().toUri().getPath();
+    String fileStatus2 = fileStatuses[1].getPath().toUri().getPath();
+    assertNotEquals(fileStatus1, dir12.toString());
+    assertNotEquals(fileStatus2, dir12.toString());
+  }
+
+  /**
+   * Tests listStatus operation on root directory.
+   */
+  @Test
+  // based on TestOzoneFileSystem:testListStatusOnLargeDirectory
+  public void testListStatusOnLargeDirectory() throws Exception {
+    Path root = new Path("/");
+    deleteRootDir(); // cleanup
+    Set<String> paths = new TreeSet<>();
+    int numDirs = LISTING_PAGE_SIZE + LISTING_PAGE_SIZE / 2;
+    for (int i = 0; i < numDirs; i++) {
+      Path p = new Path(root, String.valueOf(i));
+      fs.mkdirs(p);
+      paths.add(p.getName());
+    }
+
+    String snapshotKeyPrefix = createSnapshot();
+    deleteRootDir();
+    Path snapshotRoot = new Path(snapshotKeyPrefix + root);
+    FileStatus[] fileStatuses = o3fs.listStatus(snapshotRoot);
+    // Added logs for debugging failures, to check any sub-path mismatches.
+    Set<String> actualPaths = new TreeSet<>();
+    ArrayList<String> actualPathList = new ArrayList<>();
+    if (numDirs != fileStatuses.length) {
+      for (int i = 0; i < fileStatuses.length; i++) {
+        boolean duplicate =
+                actualPaths.add(fileStatuses[i].getPath().getName());
+        if (!duplicate) {
+          LOG.info("Duplicate path:{} in FileStatusList",
+                  fileStatuses[i].getPath().getName());
+        }
+        actualPathList.add(fileStatuses[i].getPath().getName());
+      }
+      if (numDirs != actualPathList.size()) {
+        LOG.info("actualPathsSize: {}", actualPaths.size());
+        LOG.info("actualPathListSize: {}", actualPathList.size());
+        actualPaths.removeAll(paths);
+        actualPathList.removeAll(paths);
+        LOG.info("actualPaths: {}", actualPaths);
+        LOG.info("actualPathList: {}", actualPathList);
+      }
+    }
+    assertEquals(
+        "Total directories listed do not match the existing directories",
+        numDirs, fileStatuses.length);
+
+    for (int i = 0; i < numDirs; i++) {
+      assertTrue(paths.contains(fileStatuses[i].getPath().getName()));
+    }
+  }
+
+  /**
+   * Cleanup files and directories.
+   *
+   * @throws IOException DB failure
+   */
+  @After
+  public void deleteRootDir()
+      throws IOException, InterruptedException, TimeoutException {
+    Path root = new Path("/");
+    FileStatus[] fileStatuses = fs.listStatus(root);
+
+    if (fileStatuses == null) {
+      return;
+    }
+
+    for (FileStatus fStatus : fileStatuses) {
+      fs.delete(fStatus.getPath(), true);
+    }
+
+
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return fs.listStatus(root).length == 0;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 1000, 120000);
+  }
+
+  private String createSnapshot()
+      throws IOException, InterruptedException, TimeoutException {
+
+    // create snapshot
+    String snapshotName = UUID.randomUUID().toString();
+    writeClient.createSnapshot(volumeName, bucketName, snapshotName);
+
+    // wait till the snapshot directory exists
+    SnapshotInfo snapshotInfo = ozoneManager.getMetadataManager()
+        .getSnapshotInfoTable()
+        .get(SnapshotInfo.getTableKey(volumeName, bucketName, snapshotName));
+    String snapshotDirName = metaDir + OM_KEY_PREFIX +
+        OM_SNAPSHOT_DIR + OM_KEY_PREFIX + OM_DB_NAME +
+        snapshotInfo.getCheckpointDirName() + OM_KEY_PREFIX + "CURRENT";
+    GenericTestUtils.waitFor(() -> new File(snapshotDirName).exists(),
+        1000, 120000);
+
+    return OM_KEY_PREFIX + OmSnapshotManager.getSnapshotPrefix(snapshotName);
+  }
+}
diff --git a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index d92110775a..fe5ab879d8 100644
--- a/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/interface-storage/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
-import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKeyBucket;
 import org.apache.hadoop.ozone.storage.proto.
@@ -83,7 +83,7 @@ public interface OMMetadataManager extends DBStoreHAManager {
    *
    * @return OzoneManagerLock
    */
-  OzoneManagerLock getLock();
+  IOzoneManagerLock getLock();
 
   /**
    * Returns the epoch associated with current OM process.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index cdaef81a15..75d28353ff 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -79,6 +79,8 @@ import org.apache.hadoop.ozone.om.helpers.RepeatedOmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.apache.hadoop.ozone.om.lock.IOzoneManagerLock;
+import org.apache.hadoop.ozone.om.lock.OmReadOnlyLock;
 import org.apache.hadoop.ozone.om.lock.OzoneManagerLock;
 import org.apache.hadoop.hdds.utils.TransactionInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OpenKey;
@@ -98,6 +100,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_L
 import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_FS_SNAPSHOT_MAX_LIMIT_DEFAULT;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.BUCKET_NOT_FOUND;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
 
 import org.apache.ratis.util.ExitUtils;
 import org.eclipse.jetty.util.StringUtil;
@@ -225,7 +228,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
 
   private DBStore store;
 
-  private final OzoneManagerLock lock;
+  private final IOzoneManagerLock lock;
 
   private Table userTable;
   private Table volumeTable;
@@ -288,6 +291,33 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
     this.omEpoch = 0;
   }
 
+  // metadata constructor for snapshots
+  private OmMetadataManagerImpl(OzoneConfiguration conf, String snapshotDirName)
+      throws IOException {
+    lock = new OmReadOnlyLock();
+    omEpoch = 0;
+    String snapshotDir = OMStorage.getOmDbDir(conf) +
+        OM_KEY_PREFIX + OM_SNAPSHOT_DIR;
+    setStore(loadDB(conf, new File(snapshotDir),
+        OM_DB_NAME + snapshotDirName, true));
+    initializeOmTables();
+  }
+
+  /**
+   * Factory method for creating snapshot metadata manager.
+   *
+   * @param conf - ozone configuration
+   * @param snapshotDirName - the UUID that identifies the snapshot
+   * @return the metadata manager representing the snapshot
+   * @throws IOException
+   */
+  public static OmMetadataManagerImpl createSnapshotMetadataManager(
+      OzoneConfiguration conf, String snapshotDirName) throws IOException {
+    OmMetadataManagerImpl smm = new OmMetadataManagerImpl(conf,
+        snapshotDirName);
+    return smm;
+  }
+
   @Override
   public Table<String, PersistedUserVolumeInfo> getUserTable() {
     return userTable;
@@ -415,17 +445,19 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
 
   public static DBStore loadDB(OzoneConfiguration configuration, File metaDir)
       throws IOException {
-    return loadDB(configuration, metaDir, OM_DB_NAME);
+    return loadDB(configuration, metaDir, OM_DB_NAME, false);
   }
 
   public static DBStore loadDB(OzoneConfiguration configuration, File metaDir,
-      String dbName) throws IOException {
+      String dbName, boolean readOnly) throws IOException {
+
     final int maxFSSnapshots = configuration.getInt(
         OZONE_OM_FS_SNAPSHOT_MAX_LIMIT, OZONE_OM_FS_SNAPSHOT_MAX_LIMIT_DEFAULT);
     RocksDBConfiguration rocksDBConfiguration =
         configuration.getObject(RocksDBConfiguration.class);
     DBStoreBuilder dbStoreBuilder = DBStoreBuilder.newBuilder(configuration,
         rocksDBConfiguration).setName(dbName)
+        .setOpenReadOnly(readOnly)
         .setPath(Paths.get(metaDir.getPath()))
         .setMaxFSSnapshots(maxFSSnapshots);
     DBStore dbStore = addOMTablesAndCodecs(dbStoreBuilder).build();
@@ -674,7 +706,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
    * @return OzoneManagerLock
    */
   @Override
-  public org.apache.hadoop.ozone.om.lock.OzoneManagerLock getLock() {
+  public IOzoneManagerLock getLock() {
     return lock;
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
index 8f541a578b..d3a14e267f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataReader.java
@@ -64,12 +64,11 @@ import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
  * This abstraction manages all the metadata key/acl reading
  * from a rocksDb instance, for both the OM and OM snapshots.
  */
-public class OmMetadataReader implements Auditor {
+public class OmMetadataReader implements IOmMetadataReader, Auditor {
   private final KeyManager keyManager;
   private final PrefixManager prefixManager;
   private final VolumeManager volumeManager;
   private final BucketManager bucketManager;
-  private final OMMetadataManager metadataManager;
   private final OzoneManager ozoneManager;
   private final boolean isAclEnabled;
   private final IAccessAuthorizer accessAuthorizer;
@@ -80,7 +79,6 @@ public class OmMetadataReader implements Auditor {
 
   public OmMetadataReader(KeyManager keyManager,
                    PrefixManager prefixManager,
-                   OMMetadataManager metadataManager,
                    OzoneManager ozoneManager,
                    Logger log,
                    AuditLogger audit,
@@ -89,7 +87,6 @@ public class OmMetadataReader implements Auditor {
     this.bucketManager = ozoneManager.getBucketManager();
     this.volumeManager = ozoneManager.getVolumeManager();
     this.prefixManager = prefixManager;
-    this.metadataManager = metadataManager;
     OzoneConfiguration configuration = ozoneManager.getConfiguration();
     this.ozoneManager = ozoneManager;
     this.isAclEnabled = ozoneManager.getAclsEnabled();
@@ -125,6 +122,7 @@ public class OmMetadataReader implements Auditor {
    * @return OmKeyInfo - the info about the requested key.
    * @throws IOException
    */
+  @Override
   public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
     ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
 
@@ -155,6 +153,7 @@ public class OmMetadataReader implements Auditor {
     }
   }
 
+  @Override
   public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
       String startKey, long numEntries, boolean allowPartialPrefixes)
       throws IOException {
@@ -189,6 +188,7 @@ public class OmMetadataReader implements Auditor {
     }
   }
   
+  @Override
   public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
     ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
 
@@ -214,6 +214,7 @@ public class OmMetadataReader implements Auditor {
     }
   }
 
+  @Override
   public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException {
     ResolvedBucket bucket = ozoneManager.resolveBucketLink(args);
 
@@ -244,6 +245,7 @@ public class OmMetadataReader implements Auditor {
     }
   }
 
+  @Override
   public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
       String startKey, String keyPrefix, int maxKeys) throws IOException {
 
@@ -446,6 +448,7 @@ public class OmMetadataReader implements Auditor {
     return clientMachine;
   }
 
+  @Override
   public AuditMessage buildAuditMessageForSuccess(AuditAction op,
       Map<String, String> auditMap) {
 
@@ -458,6 +461,7 @@ public class OmMetadataReader implements Auditor {
         .build();
   }
 
+  @Override
   public AuditMessage buildAuditMessageForFailure(AuditAction op,
       Map<String, String> auditMap, Throwable throwable) {
 
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshot.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshot.java
new file mode 100644
index 0000000000..fa708e6a09
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshot.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.ozone.OzoneAcl;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditLoggerType;
+import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
+import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
+import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.ozone.security.acl.OzoneObjInfo;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Metadata Reading class for OM Snapshots.
+ *
+ * This abstraction manages all the metadata key/acl reading from a
+ * rocksDb instance, for OM snapshots.  It's basically identical to
+ * the ozoneManager OmMetadataReader with two exceptions: 
+ *
+ * 1. Its keymanager and prefix manager contain an OmMetadataManager
+ * that reads from a snapshot.  
+ *
+ * 2. It normalizes/denormalizes each request as it comes in to
+ * remove/replace the ".snapshot/snapshotName" prefix.
+ */
+public class OmSnapshot implements IOmMetadataReader, Closeable {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OmSnapshot.class);
+
+  private static final AuditLogger AUDIT = new AuditLogger(
+      AuditLoggerType.OMLOGGER);
+
+  private final OmMetadataReader omMetadataReader;
+  private final String volumeName;
+  private final String bucketName;
+  private final String snapshotName;
+  private final OMMetadataManager omMetadataManager;
+
+  public OmSnapshot(KeyManager keyManager,
+                    PrefixManager prefixManager,
+                    OzoneManager ozoneManager,
+                    String volumeName,
+                    String bucketName,
+                    String snapshotName) {
+    omMetadataReader = new OmMetadataReader(keyManager, prefixManager,
+        ozoneManager, LOG, AUDIT,
+        OmSnapshotMetrics.getInstance());
+    this.snapshotName = snapshotName;
+    this.bucketName = bucketName;
+    this.volumeName = volumeName;
+    this.omMetadataManager = keyManager.getMetadataManager();
+  }
+
+
+  @Override
+  public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
+    return denormalizeOmKeyInfo(omMetadataReader.lookupKey(
+        normalizeOmKeyArgs(args)));
+  }
+
+  @Override
+  public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
+      String startKey, long numEntries, boolean allowPartialPrefixes)
+      throws IOException {
+    List<OzoneFileStatus> l = omMetadataReader
+        .listStatus(normalizeOmKeyArgs(args),
+        recursive, normalizeKeyName(startKey), numEntries,
+        allowPartialPrefixes);
+    return l.stream().map(this::denormalizeOzoneFileStatus)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
+    return denormalizeOzoneFileStatus(
+        omMetadataReader.getFileStatus(normalizeOmKeyArgs(args)));
+  }
+
+  @Override
+  public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException {
+    return denormalizeOmKeyInfo(omMetadataReader
+        .lookupFile(normalizeOmKeyArgs(args)));
+  }
+
+  @Override
+  public List<OmKeyInfo> listKeys(String vname, String bname,
+      String startKey, String keyPrefix, int maxKeys) throws IOException {
+    List<OmKeyInfo> l = omMetadataReader.listKeys(vname, bname,
+        normalizeKeyName(startKey), normalizeKeyName(keyPrefix), maxKeys);
+    return l.stream().map(this::denormalizeOmKeyInfo)
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
+    // TODO: handle denormalization
+    return omMetadataReader.getAcl(normalizeOzoneObj(obj));
+  }
+
+  private OzoneObj normalizeOzoneObj(OzoneObj o) {
+    if (o == null) {
+      return null;
+    }
+
+    return OzoneObjInfo.Builder.getBuilder(o.getResourceType(),
+        o.getStoreType(), o.getVolumeName(), o.getBucketName(),
+        normalizeKeyName(o.getKeyName()))
+        // OzonePrefixPath field appears to only used by fso
+        //  delete/rename requests which are not applicable to
+        //  snapshots
+        .setOzonePrefixPath(o.getOzonePrefixPathViewer()).build();
+
+  }
+
+
+  // Remove snapshot indicator from keyname
+  private String normalizeKeyName(String keyname) {
+    if (keyname == null) {
+      return null;
+    }
+    String[] keyParts = keyname.split("/");
+    if (OmSnapshotManager.isSnapshotKey(keyParts)) {
+      // ".snapshot/name/" becomes ""
+      if (keyParts.length == 2) {
+        return "";
+      }
+      // ".snapshot/name/key/" becomes "key/"
+      String normalizedKeyName = String.join("/",
+          Arrays.copyOfRange(keyParts, 2, keyParts.length));
+      if (keyname.endsWith("/")) {
+        normalizedKeyName = normalizedKeyName + "/";
+      }
+      return normalizedKeyName;
+    }
+    return keyname;
+  }
+
+  // Restore snapshot indicator to keyanme
+  private String denormalizeKeyName(String keyname) {
+    if (keyname == null) {
+      return null;
+    }
+    return OmSnapshotManager.getSnapshotPrefix(snapshotName) + keyname;
+  }
+
+  private OmKeyInfo denormalizeOmKeyInfo(OmKeyInfo keyInfo) {
+    if (keyInfo == null) {
+      return null;
+    }
+    OmKeyInfo denormalized = keyInfo.copyObject();
+    denormalized.setKeyName(denormalizeKeyName(keyInfo.getKeyName()));
+    return denormalized;
+  }
+
+  private OmKeyArgs normalizeOmKeyArgs(OmKeyArgs args) {
+    if (args == null) {
+      return null;
+    }
+    return args.toBuilder().setKeyName(normalizeKeyName(
+        args.getKeyName())).build();
+  }
+
+  private  OzoneFileStatus denormalizeOzoneFileStatus(
+      OzoneFileStatus fileStatus) {
+    if (fileStatus == null) {
+      return null;
+    }
+    OmKeyInfo omKeyInfo;
+    // if this is the filestatus for the whole bucket
+    if (fileStatus.getKeyInfo() == null) {
+      // denormalization requires that the keyname in the filestatus
+      // keyInfo struct be updated to include the snapshot indicator.
+      // But the bucket filestatus has a null keyInfo struct.
+
+      //  so this code adds a dummy keyinfo struct just for
+      //  denormalization.
+      //  See KeyManagerImpl.getOzoneFileStatus()
+      omKeyInfo = createDenormalizedBucketKeyInfo();
+    } else {
+      omKeyInfo = denormalizeOmKeyInfo(fileStatus.getKeyInfo());
+    }
+    return new OzoneFileStatus(
+        omKeyInfo, fileStatus.getBlockSize(), fileStatus.isDirectory());
+  }
+
+  private OmKeyInfo createDenormalizedBucketKeyInfo() {
+    return new OmKeyInfo.Builder()
+      .setVolumeName(volumeName)
+      .setBucketName(bucketName)
+      .setKeyName(denormalizeKeyName(""))
+      .setOmKeyLocationInfos(Collections.singletonList(
+          new OmKeyLocationInfoGroup(0, new ArrayList<>())))
+      .setCreationTime(Time.now())
+      .setModificationTime(Time.now())
+      .setDataSize(0)
+      .setReplicationConfig(RatisReplicationConfig
+          .getInstance(HddsProtos.ReplicationFactor.ZERO))
+      .build();
+  }
+
+  @Override
+  public void close() throws IOException {
+    omMetadataManager.getStore().close();
+  }
+
+  @VisibleForTesting
+  public OMMetadataManager getMetadataManager() {
+    return omMetadataManager;
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
new file mode 100644
index 0000000000..dcbed30bd4
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotManager.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_KEY_NAME;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.KEY_NOT_FOUND;
+
+
+/**
+ * This class is used to manage/create OM snapshots.
+ */
+public final class OmSnapshotManager {
+  private final OzoneManager ozoneManager;
+  private final LoadingCache<String, OmSnapshot> snapshotCache;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OmSnapshotManager.class);
+
+  OmSnapshotManager(OzoneManager ozoneManager) {
+    this.ozoneManager = ozoneManager;
+
+    // size of lru cache
+    int cacheSize = ozoneManager.getConfiguration().getInt(
+        OzoneConfigKeys.OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE,
+        OzoneConfigKeys.OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE_DEFAULT);
+
+    CacheLoader<String, OmSnapshot> loader;
+    loader = new CacheLoader<String, OmSnapshot>() {
+      @Override
+
+      // load the snapshot into the cache if not already there
+      @Nonnull
+      public OmSnapshot load(@Nonnull String snapshotTableKey)
+          throws IOException {
+        SnapshotInfo snapshotInfo;
+        // see if the snapshot exists
+        snapshotInfo = getSnapshotInfo(snapshotTableKey);
+
+        // read in the snapshot
+        OzoneConfiguration conf = ozoneManager.getConfiguration();
+        OMMetadataManager snapshotMetadataManager;
+
+        // Create the snapshot metadata manager by finding the corresponding
+        // RocksDB instance, creating an OmMetadataManagerImpl instance based on
+        // that
+        try {
+          snapshotMetadataManager = OmMetadataManagerImpl
+              .createSnapshotMetadataManager(
+              conf, snapshotInfo.getCheckpointDirName());
+        } catch (IOException e) {
+          LOG.error("Failed to retrieve snapshot: {}, {}", snapshotTableKey, e);
+          throw e;
+        }
+
+        // create the other manager instances based on snapshot metadataManager
+        PrefixManagerImpl pm = new PrefixManagerImpl(snapshotMetadataManager,
+            false);
+        KeyManagerImpl km = new KeyManagerImpl(null,
+            ozoneManager.getScmClient(), snapshotMetadataManager, conf, null,
+            ozoneManager.getBlockTokenSecretManager(),
+            ozoneManager.getKmsProvider(), pm);
+
+        return new OmSnapshot(km, pm, ozoneManager,
+            snapshotInfo.getVolumeName(),
+            snapshotInfo.getBucketName(),
+            snapshotInfo.getName());
+      }
+    };
+
+    RemovalListener<String, OmSnapshot> removalListener
+        = notification -> {
+          try {
+            // close snapshot's rocksdb on eviction
+            notification.getValue().close();
+          } catch (IOException e) {
+            LOG.error("Failed to close snapshot: {} {}",
+                notification.getKey(), e);
+          }
+        };
+    // init LRU cache
+    snapshotCache = CacheBuilder.newBuilder()
+        .maximumSize(cacheSize)
+        .removalListener(removalListener)
+        .build(loader);
+  }
+
+  /**
+   * Creates snapshot checkpoint that corresponds to snapshotInfo.
+   * @param omMetadataManager the metadata manager
+   * @param snapshotInfo The metadata of snapshot to be created
+   * @return instance of DBCheckpoint
+   */
+  public static DBCheckpoint createOmSnapshotCheckpoint(
+      OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo)
+      throws IOException {
+    RDBStore store = (RDBStore) omMetadataManager.getStore();
+    return store.getSnapshot(snapshotInfo.getCheckpointDirName());
+  }
+
+  // Get OmSnapshot if the keyname has ".snapshot" key indicator
+  public IOmMetadataReader checkForSnapshot(String volumeName,
+                                     String bucketName, String keyname)
+      throws IOException {
+    if (keyname == null) {
+      return ozoneManager.getOmMetadataReader();
+    }
+
+    // see if key is for a snapshot
+    String[] keyParts = keyname.split("/");
+    if (isSnapshotKey(keyParts)) {
+      String snapshotName = keyParts[1];
+      if (snapshotName == null || snapshotName.isEmpty()) {
+        // don't allow snapshot indicator without snapshot name
+        throw new OMException(INVALID_KEY_NAME);
+      }
+      String snapshotTableKey = SnapshotInfo.getTableKey(volumeName,
+          bucketName, snapshotName);
+
+      // retrieve the snapshot from the cache
+      try {
+        return snapshotCache.get(snapshotTableKey);
+      } catch (ExecutionException e) {
+        throw new IOException(e.getCause());
+      }
+    } else {
+      return ozoneManager.getOmMetadataReader();
+    }
+  }
+
+  public SnapshotInfo getSnapshotInfo(String volumeName,
+                                      String bucketName, String snapshotName)
+      throws IOException {
+    return getSnapshotInfo(SnapshotInfo.getTableKey(volumeName,
+        bucketName, snapshotName));
+  }
+
+  private SnapshotInfo getSnapshotInfo(String key) throws IOException {
+    SnapshotInfo snapshotInfo;
+    try {
+      snapshotInfo = ozoneManager.getMetadataManager()
+        .getSnapshotInfoTable()
+        .get(key);
+    } catch (IOException e) {
+      LOG.error("Snapshot {}: not found: {}", key, e);
+      throw e;
+    }
+    if (snapshotInfo == null) {
+      throw new OMException(KEY_NOT_FOUND);
+    }
+    return snapshotInfo;
+  }
+
+  public static String getSnapshotPrefix(String snapshotName) {
+    return OM_SNAPSHOT_INDICATOR + OM_KEY_PREFIX +
+        snapshotName + OM_KEY_PREFIX;
+  }
+
+  public static boolean isSnapshotKey(String[] keyParts) {
+    return (keyParts.length > 1) &&
+        (keyParts[0].compareTo(OM_SNAPSHOT_INDICATOR) == 0);
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotMetrics.java
new file mode 100644
index 0000000000..c228af1c3a
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmSnapshotMetrics.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.annotation.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class is for maintaining Snapshot Manager statistics.
+ */
+@InterfaceAudience.Private
+@Metrics(about = "Snapshot Manager Metrics", context = "dfs")
+public final class OmSnapshotMetrics implements OmMetadataReaderMetrics {
+  private static final String SOURCE_NAME =
+      OmSnapshotMetrics.class.getSimpleName();
+
+  private OmSnapshotMetrics() {
+  }
+
+  private static OmSnapshotMetrics instance;
+  public static OmSnapshotMetrics getInstance() {
+    if (instance != null) {
+      return instance;
+    }
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    instance = ms.register(SOURCE_NAME,
+        "Snapshot Manager Metrics",
+        new OmSnapshotMetrics());
+    return instance;
+  }
+
+  private @Metric
+      MutableCounterLong numKeyLookup;
+  private @Metric
+      MutableCounterLong numKeyLookupFails;
+
+  @Override
+  public void incNumKeyLookups() {
+    numKeyOps.incr();
+    numKeyLookup.incr();
+  }
+
+  @Override
+  public void incNumKeyLookupFails() {
+    numKeyLookupFails.incr();
+  }
+
+  private @Metric
+      MutableCounterLong numListStatus;
+  private @Metric
+      MutableCounterLong numListStatusFails;
+
+  @Override
+  public void incNumListStatus() {
+    numKeyOps.incr();
+    numFSOps.incr();
+    numListStatus.incr();
+  }
+
+  @Override
+  public void incNumListStatusFails() {
+    numListStatusFails.incr();
+  }
+
+  private @Metric
+      MutableCounterLong numGetFileStatus;
+  private @Metric
+      MutableCounterLong numGetFileStatusFails;
+
+  @Override
+  public void incNumGetFileStatus() {
+    numKeyOps.incr();
+    numFSOps.incr();
+    numGetFileStatus.incr();
+  }
+
+  @Override
+  public void incNumGetFileStatusFails() {
+    numGetFileStatusFails.incr();
+  }
+
+  private @Metric
+      MutableCounterLong numLookupFile;
+  private @Metric
+      MutableCounterLong numLookupFileFails;
+
+  @Override
+  public void incNumLookupFile() {
+    numKeyOps.incr();
+    numFSOps.incr();
+    numLookupFile.incr();
+  }
+
+  @Override
+  public void incNumLookupFileFails() {
+    numLookupFileFails.incr();
+  }
+
+  private @Metric
+      MutableCounterLong numKeyLists;
+
+  private @Metric
+      MutableCounterLong numKeyListFails;
+
+  @Override
+  public void incNumKeyLists() {
+    numKeyLists.incr();
+  }
+
+  @Override
+  public void incNumKeyListFails() {
+    numKeyListFails.incr();
+  }
+
+  private @Metric
+      MutableCounterLong numGetAcl;
+
+  @Override
+  public void incNumGetAcl() {
+    numGetAcl.incr();
+  }
+
+  private @Metric
+      MutableCounterLong numKeyOps;
+  private @Metric
+      MutableCounterLong numFSOps;
+}
+
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 8d185242a2..9664e602f4 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -427,6 +427,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
   // This metadata reader points to the active filesystem
   private OmMetadataReader omMetadataReader;
+  private OmSnapshotManager omSnapshotManager;
 
   @SuppressWarnings("methodlength")
   private OzoneManager(OzoneConfiguration conf, StartupOption startupOption)
@@ -706,7 +707,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     keyManager = new KeyManagerImpl(this, scmClient, configuration,
         omStorage.getOmId());
     omMetadataReader = new OmMetadataReader(keyManager, prefixManager,
-        metadataManager, this, LOG, AUDIT, metrics);
+        this, LOG, AUDIT, metrics);
+    omSnapshotManager = new OmSnapshotManager(this);
 
     if (withNewSnapshot) {
       Integer layoutVersionInDB = getLayoutVersionInDB();
@@ -2718,14 +2720,14 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   @Override
   public OmKeyInfo lookupKey(OmKeyArgs args) throws IOException {
-    return omMetadataReader.lookupKey(args);
+    return getReader(args).lookupKey(args);
   }
 
   @Override
   public List<OmKeyInfo> listKeys(String volumeName, String bucketName,
       String startKey, String keyPrefix, int maxKeys) throws IOException {
-    return omMetadataReader.listKeys(volumeName, bucketName,
-        startKey, keyPrefix, maxKeys);
+    return getReader(volumeName, bucketName, keyPrefix).listKeys(
+        volumeName, bucketName, startKey, keyPrefix, maxKeys);
   }
 
   @Override
@@ -3297,26 +3299,20 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
 
   @Override
   public OzoneFileStatus getFileStatus(OmKeyArgs args) throws IOException {
-    return omMetadataReader.getFileStatus(args);
+    return getReader(args).getFileStatus(args);
   }
 
   @Override
   public OmKeyInfo lookupFile(OmKeyArgs args) throws IOException {
-    return omMetadataReader.lookupFile(args);
+    return getReader(args).lookupFile(args);
   }
 
   @Override
-  public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
-      String startKey, long numEntries)
-      throws IOException {
-    return listStatus(args, recursive, startKey, numEntries, false);
-  }
-
   public List<OzoneFileStatus> listStatus(OmKeyArgs args, boolean recursive,
       String startKey, long numEntries, boolean allowPartialPrefixes)
       throws IOException {
 
-    return omMetadataReader.listStatus(args, recursive,
+    return getReader(args).listStatus(args, recursive,
         startKey, numEntries, allowPartialPrefixes);
   }
 
@@ -3328,7 +3324,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    */
   @Override
   public List<OzoneAcl> getAcl(OzoneObj obj) throws IOException {
-    return omMetadataReader.getAcl(obj);
+    return getReader(obj).getAcl(obj);
   }
 
   /**
@@ -4199,4 +4195,19 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private BucketLayout getBucketLayout() {
     return BucketLayout.DEFAULT;
   }
+  private IOmMetadataReader getReader(OmKeyArgs keyArgs) throws IOException {
+    return omSnapshotManager.checkForSnapshot(
+        keyArgs.getVolumeName(), keyArgs.getBucketName(), keyArgs.getKeyName());
+  }
+
+  private IOmMetadataReader getReader(String volumeName, String bucketName,
+      String key) throws IOException {
+    return omSnapshotManager.checkForSnapshot(volumeName, bucketName, key);
+  }
+
+  private IOmMetadataReader getReader(OzoneObj ozoneObj) throws IOException {
+    return omSnapshotManager.checkForSnapshot(
+        ozoneObj.getVolumeName(), ozoneObj.getBucketName(),
+        ozoneObj.getKeyName());
+  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotManager.java
deleted file mode 100644
index 31af5e19eb..0000000000
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotManager.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.om;
-
-import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
-import org.apache.hadoop.hdds.utils.db.RDBStore;
-import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
-
-import java.io.IOException;
-
-/**
- * This class is used to manage/create OM snapshots.
- */
-public final class SnapshotManager {
-
-  
-  /**
-   * Creates snapshot checkpoint that corresponds with SnapshotInfo.
-   * @param OMMetadataManager the metadata manager
-   * @param snapshotInfo The metadata of snapshot to be created
-   * @return instance of DBCheckpoint
-   */
-  public static DBCheckpoint createSnapshot(
-      OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo)
-      throws IOException {
-    RDBStore store = (RDBStore) omMetadataManager.getStore();
-    return store.getSnapshot(snapshotInfo.getCheckpointDirName());
-  }
-
-  private SnapshotManager() { }
-
-}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotCreateResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotCreateResponse.java
index 5809182fef..f59588ba0f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotCreateResponse.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotCreateResponse.java
@@ -20,7 +20,7 @@ package org.apache.hadoop.ozone.om.response.snapshot;
 
 import org.apache.hadoop.hdds.utils.db.BatchOperation;
 import org.apache.hadoop.ozone.om.OMMetadataManager;
-import org.apache.hadoop.ozone.om.SnapshotManager;
+import org.apache.hadoop.ozone.om.OmSnapshotManager;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
 import org.apache.hadoop.ozone.om.response.OMClientResponse;
@@ -61,7 +61,8 @@ public class OMSnapshotCreateResponse extends OMClientResponse {
         getOMResponse().getCreateSnapshotResponse().getSnapshotInfo());
 
     // Create the snapshot checkpoint
-    SnapshotManager.createSnapshot(omMetadataManager, snapshotInfo);
+    OmSnapshotManager.createOmSnapshotCheckpoint(omMetadataManager,
+        snapshotInfo);
 
     String key = snapshotInfo.getTableKey();
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
new file mode 100644
index 0000000000..3a89ed6973
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestOmSnapshotManager.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.HddsWhiteboxTestUtils;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.UUID;
+
+import static org.apache.hadoop.ozone.om.OmSnapshotManager.getSnapshotPrefix;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test om snapshot manager.
+ */
+public class TestOmSnapshotManager {
+
+  private OzoneManager om;
+  private File testDir;
+
+  @Before
+  public void init() throws Exception {
+    OzoneConfiguration configuration = new OzoneConfiguration();
+    testDir = GenericTestUtils.getRandomizedTestDir();
+    configuration.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        testDir.toString());
+
+    // Only allow one entry in cache so each new one causes an eviction
+    configuration.setInt(
+        OzoneConfigKeys.OZONE_OM_SNAPSHOT_CACHE_MAX_SIZE, 1);
+
+    OmTestManagers omTestManagers = new OmTestManagers(configuration);
+    om = omTestManagers.getOzoneManager();
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    om.stop();
+    FileUtils.deleteDirectory(testDir);
+  }
+
+  @Test
+  public void testCloseOnEviction() throws IOException {
+
+    // set up db table
+    SnapshotInfo first = createSnapshotInfo();
+    SnapshotInfo second = createSnapshotInfo();
+    Table<String, SnapshotInfo> snapshotInfoTable = mock(Table.class);
+    when(snapshotInfoTable.get(first.getTableKey())).thenReturn(first);
+    when(snapshotInfoTable.get(second.getTableKey())).thenReturn(second);
+    HddsWhiteboxTestUtils.setInternalState(
+        om.getMetadataManager(), "snapshotInfoTable", snapshotInfoTable);
+
+    // create the first snapshot checkpoint
+    OmSnapshotManager.createOmSnapshotCheckpoint(om.getMetadataManager(),
+        first);
+
+    // retrieve it and setup store mock
+    OmSnapshotManager omSnapshotManager = new OmSnapshotManager(om);
+    OmSnapshot firstSnapshot = (OmSnapshot) omSnapshotManager
+        .checkForSnapshot(first.getVolumeName(),
+        first.getBucketName(), getSnapshotPrefix(first.getName()));
+    DBStore firstSnapshotStore = mock(DBStore.class);
+    HddsWhiteboxTestUtils.setInternalState(
+        firstSnapshot.getMetadataManager(), "store", firstSnapshotStore);
+
+    //  create second snapshot checkpoint (which will be used for eviction)
+    OmSnapshotManager.createOmSnapshotCheckpoint(om.getMetadataManager(),
+        second);
+
+    // confirm store not yet closed
+    verify(firstSnapshotStore, times(0)).close();
+
+    // read in second snapshot to evict first
+    omSnapshotManager
+        .checkForSnapshot(second.getVolumeName(),
+        second.getBucketName(), getSnapshotPrefix(second.getName()));
+
+
+    // confirm store was closed
+    verify(firstSnapshotStore, timeout(3000).times(1)).close();
+  }
+
+  private SnapshotInfo createSnapshotInfo() {
+    String snapshotName = UUID.randomUUID().toString();
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    return SnapshotInfo.newInstance(
+        volumeName, bucketName, snapshotName);
+  }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
index 37baf0ccf7..dd3cc2c14b 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
 import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -89,7 +88,7 @@ public class TestOMSnapshotCreateRequest {
     when(ozoneManager.getMetrics()).thenReturn(omMetrics);
     when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
     when(ozoneManager.isRatisEnabled()).thenReturn(true);
-    when(ozoneManager.isAdmin((UserGroupInformation) any())).thenReturn(false);
+    when(ozoneManager.isAdmin(any())).thenReturn(false);
     when(ozoneManager.isOwner(any(), any())).thenReturn(false);
     when(ozoneManager.getBucketOwner(any(), any(),
         any(), any())).thenReturn("dummyBucketOwner");
@@ -151,14 +150,14 @@ public class TestOMSnapshotCreateRequest {
   
   @Test
   public void testValidateAndUpdateCache() throws Exception {
-    when(ozoneManager.isAdmin((UserGroupInformation) any())).thenReturn(true);
+    when(ozoneManager.isAdmin(any())).thenReturn(true);
     OMRequest omRequest =
         OMRequestTestUtils.createSnapshotRequest(
         volumeName, bucketName, snapshotName);
     OMSnapshotCreateRequest omSnapshotCreateRequest =
         doPreExecute(omRequest);
-    String key = SnapshotInfo.newInstance(volumeName,
-        bucketName, snapshotName).getTableKey();
+    String key = SnapshotInfo.getTableKey(volumeName,
+        bucketName, snapshotName);
 
     // As we have not still called validateAndUpdateCache, get() should
     // return null.
@@ -190,13 +189,13 @@ public class TestOMSnapshotCreateRequest {
 
   @Test
   public void testEntryExists() throws Exception {
-    when(ozoneManager.isAdmin((UserGroupInformation) any())).thenReturn(true);
+    when(ozoneManager.isAdmin(any())).thenReturn(true);
     OMRequest omRequest =
         OMRequestTestUtils.createSnapshotRequest(
         volumeName, bucketName, snapshotName);
     OMSnapshotCreateRequest omSnapshotCreateRequest = doPreExecute(omRequest);
-    String key = SnapshotInfo.newInstance(volumeName,
-        bucketName, snapshotName).getTableKey();
+    String key = SnapshotInfo.getTableKey(volumeName,
+        bucketName, snapshotName);
 
     Assert.assertNull(omMetadataManager.getSnapshotInfoTable().get(key));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org