You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by pr...@apache.org on 2022/08/25 04:23:42 UTC

[ozone] branch HDDS-6517-Snapshot updated: HDDS-6853. [Snapshot] Implement Create Snapshot API. (#3652)

This is an automated email from the ASF dual-hosted git repository.

prashantpogde pushed a commit to branch HDDS-6517-Snapshot
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-6517-Snapshot by this push:
     new 4dba9020ed HDDS-6853. [Snapshot] Implement Create Snapshot API. (#3652)
4dba9020ed is described below

commit 4dba9020ed16b87ca96f0d517d7e33c48cd81f01
Author: GeorgeJahad <gi...@blackbirdsystems.net>
AuthorDate: Wed Aug 24 21:23:37 2022 -0700

    HDDS-6853. [Snapshot] Implement Create Snapshot API. (#3652)
    
    * rebased
    
    * fixed equals
    
    * trigger new CI check
    
    * fix merge from master
    
    * checkstyle
    
    * use the UUID for the directory name
    
    * changed protocol from snapshotPath to vol/bucket
    
    * added official SNAPSHOT_LOCK
    
    * fixed the snapshot lock resource name
    
    * fixed snapshot lock methods
    
    * checkstyle
    
    * cleanup
    
    * now throws on failure to create directories
    
    Co-authored-by: George Jahad <ge...@georgejahad.com>
---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   5 +
 .../hadoop/hdds/utils/db/RDBCheckpointManager.java |  21 +-
 .../org/apache/hadoop/hdds/utils/db/RDBStore.java  |  25 ++-
 .../apache/hadoop/ozone/client/ObjectStore.java    |  13 ++
 .../ozone/client/protocol/ClientProtocol.java      |  12 ++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  19 ++
 .../main/java/org/apache/hadoop/ozone/OmUtils.java |  18 ++
 .../org/apache/hadoop/ozone/audit/OMAction.java    |   3 +-
 .../hadoop/ozone/om/exceptions/OMException.java    |   4 +-
 .../hadoop/ozone/om/helpers/SnapshotInfo.java      | 122 ++++++++++-
 .../hadoop/ozone/om/lock/OzoneManagerLock.java     |   8 +-
 .../hadoop/ozone/om/lock/OzoneManagerLockUtil.java |  12 ++
 .../ozone/om/protocol/OzoneManagerProtocol.java    |  14 ++
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  29 +++
 .../ozone/om/helpers/TestOmSnapshotInfo.java       |  10 +-
 .../hadoop/ozone/om/lock/TestOzoneManagerLock.java |   9 +-
 .../src/main/proto/OmClientProtocol.proto          |  15 ++
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |  20 ++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   2 +-
 .../apache/hadoop/ozone/om/SnapshotManager.java    |  48 +++++
 .../om/ratis/utils/OzoneManagerRatisUtils.java     |   4 +-
 .../request/snapshot/OMSnapshotCreateRequest.java  | 176 +++++++++++++++
 .../ozone/om/request/snapshot/package-info.java    |  23 ++
 .../snapshot/OMSnapshotCreateResponse.java         |  72 +++++++
 .../ozone/om/response/snapshot/package-info.java   |  23 ++
 .../ozone/om/request/OMRequestTestUtils.java       |  15 ++
 .../snapshot/TestOMSnapshotCreateRequest.java      | 235 +++++++++++++++++++++
 .../ozone/om/request/snapshot/package-info.java    |  23 ++
 .../snapshot/TestOMSnapshotCreateResponse.java     | 119 +++++++++++
 .../ozone/om/response/snapshot/package-info.java   |  23 ++
 .../hadoop/ozone/client/ClientProtocolStub.java    |   7 +
 .../shell/snapshot/CreateSnapshotHandler.java      |  63 ++++++
 .../ozone/shell/snapshot/SnapshotCommands.java     |  72 +++++++
 .../hadoop/ozone/shell/snapshot/package-info.java  |  23 ++
 34 files changed, 1271 insertions(+), 16 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index a3fb9e0ee9..97dd7fde0b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -549,4 +549,9 @@ public final class OzoneConsts {
    * tenant.
    */
   public static final int OZONE_MAXIMUM_ACCESS_ID_LENGTH = 100;
+
+  public static final String OM_SNAPSHOT_NAME = "snapshotName";
+  public static final String OM_SNAPSHOT_DIR = "db.snapshots";
+  public static final String OM_SNAPSHOT_INDICATOR = ".snapshot";
+
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java
index 9cc23b53ba..6835fe559a 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBCheckpointManager.java
@@ -54,12 +54,13 @@ public class RDBCheckpointManager {
   }
 
   /**
-   * Create RocksDB snapshot by saving a checkpoint to a directory.
+   * Create Ozone snapshot by saving a RocksDb checkpoint to a directory.
    *
    * @param parentDir The directory where the checkpoint needs to be created.
+   * @param name name of checkpoint dir, (null for default name)
    * @return RocksDB specific Checkpoint information object.
    */
-  public RocksDBCheckpoint createCheckpoint(String parentDir) {
+  public RocksDBCheckpoint createCheckpoint(String parentDir, String name) {
     try {
       long currentTime = System.currentTimeMillis();
 
@@ -67,7 +68,10 @@ public class RDBCheckpointManager {
       if (StringUtils.isNotEmpty(checkpointNamePrefix)) {
         checkpointDir += checkpointNamePrefix;
       }
-      checkpointDir += "_" + RDB_CHECKPOINT_DIR_PREFIX + currentTime;
+      if (name == null) {
+        name = "_" + RDB_CHECKPOINT_DIR_PREFIX + currentTime;
+      }
+      checkpointDir += name;
 
       Path checkpointPath = Paths.get(parentDir, checkpointDir);
       Instant start = Instant.now();
@@ -90,4 +94,15 @@ public class RDBCheckpointManager {
     }
     return null;
   }
+
+  /**
+   * Create RocksDB snapshot by saving a checkpoint to a directory.
+   *
+   * @param parentDir The directory where the checkpoint needs to be created.
+   * @return RocksDB specific Checkpoint information object.
+   */
+  public RocksDBCheckpoint createCheckpoint(String parentDir) {
+    return createCheckpoint(parentDir, null);
+  }
+
 }
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
index 9618f66cd7..a0ccd10e4f 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStore.java
@@ -45,6 +45,8 @@ import org.rocksdb.TransactionLogIterator.BatchResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+
 /**
  * RocksDB Store that supports creating Tables in DB.
  */
@@ -57,6 +59,7 @@ public class RDBStore implements DBStore {
   private ObjectName statMBeanName;
   private final RDBCheckpointManager checkPointManager;
   private final String checkpointsParentDir;
+  private final String snapshotsParentDir;
   private final RDBMetrics rdbMetrics;
 
   @VisibleForTesting
@@ -100,7 +103,22 @@ public class RDBStore implements DBStore {
       if (!checkpointsDir.exists()) {
         boolean success = checkpointsDir.mkdir();
         if (!success) {
-          LOG.warn("Unable to create RocksDB checkpoint directory");
+          throw new IOException(
+              "Unable to create RocksDB checkpoint directory: " +
+              checkpointsParentDir);
+        }
+      }
+
+      //create snapshot directory if does not exist.
+      snapshotsParentDir = Paths.get(dbLocation.getParent(),
+          OM_SNAPSHOT_DIR).toString();
+      File snapshotsDir = new File(snapshotsParentDir);
+      if (!snapshotsDir.exists()) {
+        boolean success = snapshotsDir.mkdir();
+        if (!success) {
+          throw new IOException(
+              "Unable to create RocksDB snapshot directory: " +
+              snapshotsParentDir);
         }
       }
 
@@ -246,6 +264,11 @@ public class RDBStore implements DBStore {
     return checkPointManager.createCheckpoint(checkpointsParentDir);
   }
 
+  public DBCheckpoint getSnapshot(String name) throws IOException {
+    this.flushLog(true);
+    return checkPointManager.createCheckpoint(snapshotsParentDir, name);
+  }
+
   @Override
   public File getDbLocation() {
     return dbLocation;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
index a38c80e153..c8e1c1f54c 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/ObjectStore.java
@@ -536,4 +536,17 @@ public class ObjectStore {
     return proxy.getAcl(obj);
   }
 
+  /**
+   * Create snapshot.
+   * @param name name to be used
+   * @param snapshotPath snapshotPath to be used
+   * @return name used
+   * @throws IOException
+   */
+  public String createSnapshot(String volumeName,
+      String bucketName, String snapshotName) throws IOException {
+    return proxy.createSnapshot(volumeName, bucketName, snapshotName);
+  }
+
+
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 70a04406a0..392004c618 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -957,4 +957,16 @@ public interface ClientProtocol {
       Map<DatanodeDetails, OzoneInputStream>> getKeysEveryReplicas(
           String volumeName, String bucketName, String keyName)
       throws IOException;
+
+  /**
+   * Create snapshot.
+   * @param volumeName vol to be used
+   * @param bucketName bucket to be used
+   * @param snapshotName name to be used
+   * @return name used
+   * @throws IOException
+   */
+  String createSnapshot(String volumeName,
+      String bucketName, String snapshotName) throws IOException;
+  
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index adbf5d7061..5a878e3188 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -909,6 +909,25 @@ public class RpcClient implements ClientProtocol {
     ozoneManagerClient.tenantRevokeUserAccessId(accessId);
   }
 
+  /**
+   * Create Snapshot.
+   * @param volumeName vol to be used
+   * @param bucketName bucket to be used
+   * @param snapshotName name to be used
+   * @return name used
+   * @throws IOException
+   */
+  @Override
+  public String createSnapshot(String volumeName,
+      String bucketName, String snapshotName) throws IOException {
+    Preconditions.checkArgument(Strings.isNotBlank(volumeName),
+        "volume can't be null or empty.");
+    Preconditions.checkArgument(Strings.isNotBlank(bucketName),
+        "bucket can't be null or empty.");
+    return ozoneManagerClient.createSnapshot(volumeName,
+        bucketName, snapshotName);
+  }
+
   /**
    * Assign admin role to an accessId in a tenant.
    * @param accessId access ID.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index a5bfb81c7b..36f04ae566 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -316,6 +316,7 @@ public final class OmUtils {
     case TenantAssignAdmin:
     case TenantRevokeAdmin:
     case SetRangerServiceVersion:
+    case CreateSnapshot:
       return false;
     default:
       LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);
@@ -548,6 +549,23 @@ public final class OmUtils {
     }
   }
 
+  /**
+   * Verify snapshot name is a valid DNS name.
+   */
+  public static void validateSnapshotName(String snapshotName)
+      throws OMException {
+    // allow null name, for when user wants generated name
+    if (snapshotName == null) {
+      return;
+    }
+    try {
+      HddsClientUtils.verifyResourceName(snapshotName);
+    } catch (IllegalArgumentException e) {
+      throw new OMException("Invalid snapshot name: " + snapshotName,
+          OMException.ResultCodes.INVALID_SNAPSHOT_ERROR);
+    }
+  }
+
   /**
    * Return OM Client Rpc Time out.
    */
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
index 66673da79e..13d8b16598 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/audit/OMAction.java
@@ -88,7 +88,8 @@ public enum OMAction implements AuditAction {
 
   TENANT_ASSIGN_ADMIN,
   TENANT_REVOKE_ADMIN,
-  TENANT_LIST_USER;
+  TENANT_LIST_USER,
+  CREATE_SNAPSHOT;
 
   @Override
   public String getAction() {
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index a669f8a7df..4b4a1698ec 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -257,6 +257,8 @@ public class OMException extends IOException {
     VOLUME_IS_REFERENCED,
     TENANT_NOT_EMPTY,
 
-    FEATURE_NOT_ENABLED
+    FEATURE_NOT_ENABLED,
+
+    INVALID_SNAPSHOT_ERROR
   }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
index 37273f2f59..23e72990cb 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/SnapshotInfo.java
@@ -18,11 +18,29 @@ package org.apache.hadoop.ozone.om.helpers;
  *  limitations under the License.
  */
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.audit.Auditable;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.SnapshotStatusProto;
 
 import com.google.common.base.Preconditions;
 
+import org.apache.hadoop.util.Time;
+
+import java.time.format.DateTimeFormatter;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.time.ZoneId;
+
+import java.util.Objects;
+import java.util.UUID;
+import java.util.Map;
+import java.util.LinkedHashMap;
+
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+
 /**
  * This class is used for storing info related to Snapshots.
  *
@@ -31,7 +49,7 @@ import com.google.common.base.Preconditions;
  * snapshot checkpoint directory, previous snapshotid
  * for the snapshot path & global amongst other necessary fields.
  */
-public final class SnapshotInfo {
+public final class SnapshotInfo implements Auditable {
 
   /**
    * SnapshotStatus enum composed of
@@ -71,8 +89,13 @@ public final class SnapshotInfo {
             "BUG: missing valid SnapshotStatus, found status=" + status);
       }
     }
-  };
+  }
 
+  private static final String SEPARATOR = "-";
+  private static final long INVALID_TIMESTAMP = -1;
+  private static final String INITIAL_SNAPSHOT_ID =
+      UUID.randomUUID().toString();
+    
   private final String snapshotID;  // UUID
   private String name;
   private String volumeName;
@@ -342,7 +365,7 @@ public final class SnapshotInfo {
 
   /**
    * Parses SnapshotInfo protobuf and creates SnapshotInfo.
-   * @param snapshotInfo protobuf
+   * @param snapshotInfoProto protobuf
    * @return instance of SnapshotInfo
    */
   public static SnapshotInfo getFromProtobuf(
@@ -365,4 +388,97 @@ public final class SnapshotInfo {
 
     return osib.build();
   }
+  @Override
+  public Map<String, String> toAuditMap() {
+    Map<String, String> auditMap = new LinkedHashMap<>();
+    auditMap.put(OzoneConsts.VOLUME, getVolumeName());
+    auditMap.put(OzoneConsts.BUCKET, getBucketName());
+    auditMap.put(OzoneConsts.OM_SNAPSHOT_NAME, this.name);
+    return auditMap;
+  }
+
+  /**
+   * Get the name of the checkpoint directory.
+   */
+  public static String getCheckpointDirName(String snapshotId) {
+    return SEPARATOR + snapshotId;
+  }
+  /**
+   * Get the name of the checkpoint directory, (non-static).
+   */
+  public String getCheckpointDirName() {
+    return getCheckpointDirName(getSnapshotID());
+  }
+
+  /**
+   * Get the table key for this snapshot.
+   */
+  public String getTableKey() {
+    return OM_KEY_PREFIX + snapshotPath + OM_KEY_PREFIX + name;
+  }
+
+  /**
+   * Generate default name of snapshot, (used if user doesn't provide one).
+   */
+  @VisibleForTesting
+  public static String generateName(long initialTime) {
+    String timePattern = "yyyyMMdd-HHmmss.SSS";
+    DateTimeFormatter formatter = DateTimeFormatter.ofPattern(timePattern);
+    Instant instant = Instant.ofEpochMilli(initialTime);
+    return "s" + formatter.format(
+        ZonedDateTime.ofInstant(instant, ZoneId.of("UTC")));
+  }
+  
+  /**
+   * Factory for making standard instance.
+   */
+  public static SnapshotInfo newInstance(String volumeName,
+      String bucketName, String snapshotName) {
+    SnapshotInfo.Builder builder = new SnapshotInfo.Builder();
+    String id = UUID.randomUUID().toString();
+    long initialTime = Time.now();
+    if (StringUtils.isBlank(snapshotName)) {
+      snapshotName = generateName(initialTime);
+    }
+    builder.setSnapshotID(id)
+        .setName(snapshotName)
+        .setCreationTime(initialTime)
+        .setDeletionTime(INVALID_TIMESTAMP)
+        .setPathPreviousSnapshotID(INITIAL_SNAPSHOT_ID)
+        .setGlobalPreviousSnapshotID(INITIAL_SNAPSHOT_ID)
+        .setSnapshotPath(volumeName + OM_KEY_PREFIX + bucketName)
+        .setVolumeName(volumeName)
+        .setBucketName(bucketName)
+        .setCheckpointDir(getCheckpointDirName(id));
+    return builder.build();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SnapshotInfo that = (SnapshotInfo) o;
+    return creationTime == that.creationTime &&
+        deletionTime == that.deletionTime &&
+        snapshotID.equals(that.snapshotID) &&
+        name.equals(that.name) && volumeName.equals(that.volumeName) &&
+        bucketName.equals(that.bucketName) &&
+        snapshotStatus == that.snapshotStatus &&
+        pathPreviousSnapshotID.equals(that.pathPreviousSnapshotID) &&
+        globalPreviousSnapshotID.equals(that.globalPreviousSnapshotID) &&
+        snapshotPath.equals(that.snapshotPath) &&
+        checkpointDir.equals(that.checkpointDir);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(snapshotID, name, volumeName, bucketName,
+        snapshotStatus,
+        creationTime, deletionTime, pathPreviousSnapshotID,
+        globalPreviousSnapshotID, snapshotPath, checkpointDir);
+  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
index 2e5c492c08..353264ac49 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLock.java
@@ -257,12 +257,15 @@ public class OzoneManagerLock {
     } else if (resources.length == 2 && resource == Resource.BUCKET_LOCK) {
       return OzoneManagerLockUtil.generateBucketLockName(resources[0],
           resources[1]);
+    } else if (resources.length == 3 && resource == Resource.SNAPSHOT_LOCK) {
+      return OzoneManagerLockUtil.generateSnapshotLockName(resources[0],
+          resources[1], resources[2]);
     } else if (resources.length == 3 && resource == Resource.KEY_PATH_LOCK) {
       return OzoneManagerLockUtil.generateKeyPathLockName(resources[0],
           resources[1], resources[2]);
     } else {
       throw new IllegalArgumentException("acquire lock is supported on single" +
-          " resource for all locks except for resource bucket");
+          " resource for all locks except for resource bucket/snapshot");
     }
   }
 
@@ -648,7 +651,8 @@ public class OzoneManagerLock {
 
     S3_SECRET_LOCK((byte) 4, "S3_SECRET_LOCK"), // 31
     KEY_PATH_LOCK((byte) 5, "KEY_PATH_LOCK"), //63
-    PREFIX_LOCK((byte) 6, "PREFIX_LOCK"); //127
+    PREFIX_LOCK((byte) 6, "PREFIX_LOCK"), //127
+    SNAPSHOT_LOCK((byte) 7, "SNAPSHOT_LOCK"); // = 255
 
     // level of the resource
     private byte lockLevel;
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLockUtil.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLockUtil.java
index cc4fb4c3cb..764e65cba8 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLockUtil.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/lock/OzoneManagerLockUtil.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_S3_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_S3_SECRET;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_INDICATOR;
 import static org.apache.hadoop.ozone.OzoneConsts.OM_USER_PREFIX;
 
 /**
@@ -71,6 +72,17 @@ final class OzoneManagerLockUtil {
     return OM_KEY_PREFIX + volumeName + OM_KEY_PREFIX + bucketName;
   }
 
+  /**
+   * Generate snapshot lock name.
+   * @param volumeName
+   * @param bucketName
+   */
+  public static String generateSnapshotLockName(String volumeName,
+      String bucketName, String snapshotName) {
+    return generateBucketLockName(volumeName, bucketName) +
+        OM_KEY_PREFIX + OM_SNAPSHOT_INDICATOR + OM_KEY_PREFIX + snapshotName;
+  }
+
   /**
    * Generate key path lock name.
    * @param volumeName
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 171f515bbb..3e7a756c2a 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -641,6 +641,20 @@ public interface OzoneManagerProtocol
         "this to be implemented, as write requests use a new approach");
   }
 
+  /**
+   * Create snapshot.
+   * @param volumeName vol to be used
+   * @param bucketName bucket to be used
+   * @param snapshotName name to be used
+   * @return name used
+   * @throws IOException
+   */
+  default String createSnapshot(String volumeName,
+      String bucketName, String snapshotName) throws IOException {
+    throw new UnsupportedOperationException("OzoneManager does not require " +
+        "this to be implemented");
+  }
+
   /**
    * Assign admin role to a user identified by an accessId in a tenant.
    * @param accessId access ID.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 5bc7784f8e..b7a65448a9 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hdds.annotation.InterfaceAudience;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
@@ -61,6 +62,7 @@ import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.S3VolumeContext;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
 import org.apache.hadoop.ozone.om.helpers.TenantStateList;
 import org.apache.hadoop.ozone.om.helpers.TenantUserInfoValue;
 import org.apache.hadoop.ozone.om.helpers.TenantUserList;
@@ -1085,6 +1087,33 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
     handleError(omResponse);
   }
 
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public String createSnapshot(String volumeName,
+      String bucketName, String snapshotName)
+      throws IOException {
+
+    final OzoneManagerProtocolProtos.CreateSnapshotRequest.Builder
+        requestBuilder =
+        OzoneManagerProtocolProtos.CreateSnapshotRequest.newBuilder()
+            .setVolumeName(volumeName)
+            .setBucketName(bucketName);
+    if (!StringUtils.isBlank(snapshotName)) {
+      requestBuilder.setSnapshotName(snapshotName);
+    }
+      
+    final OMRequest omRequest = createOMRequest(Type.CreateSnapshot)
+        .setCreateSnapshotRequest(requestBuilder)
+        .build();
+    final OMResponse omResponse = submitRequest(omRequest);
+    handleError(omResponse);
+    SnapshotInfo snapshotInfo = SnapshotInfo.getFromProtobuf(
+        omResponse.getCreateSnapshotResponse().getSnapshotInfo());
+    return snapshotInfo.getName();
+  }
+
   /**
    * {@inheritDoc}
    */
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
index d70522ba95..23c6dbb5e8 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/helpers/TestOmSnapshotInfo.java
@@ -108,7 +108,7 @@ public class TestOmSnapshotInfo {
         snapshotInfoEntryActual.getBucketName());
     Assert.assertEquals(snapshotInfoEntryExpected.getSnapshotStatus(),
         snapshotInfoEntryActual.getSnapshotStatus());
-
+    Assert.assertEquals(snapshotInfoEntryExpected, snapshotInfoEntryActual);
   }
 
   @Test
@@ -129,6 +129,14 @@ public class TestOmSnapshotInfo {
         snapshotInfoActual.getBucketName());
     Assert.assertEquals(snapshotInfoExpected.getSnapshotStatus(),
         snapshotInfoActual.getSnapshotStatus());
+    Assert.assertEquals(snapshotInfoExpected, snapshotInfoActual);
+  }
 
+  @Test
+  public void testGenerateName() {
+    // GMT: Sunday, July 10, 2022 7:56:55.001 PM
+    long millis = 1657483015001L;
+    String name = SnapshotInfo.generateName(millis);
+    Assert.assertEquals("s20220710-195655.001", name);
   }
 }
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
index 640d11d1ea..a87887111c 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
+++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/lock/TestOzoneManagerLock.java
@@ -265,7 +265,8 @@ public class TestOzoneManagerLock {
     if (resource == OzoneManagerLock.Resource.BUCKET_LOCK) {
       return new String[]{UUID.randomUUID().toString(),
           UUID.randomUUID().toString()};
-    } else if (resource == OzoneManagerLock.Resource.KEY_PATH_LOCK) {
+    } else if ((resource == OzoneManagerLock.Resource.KEY_PATH_LOCK) ||
+        (resource == OzoneManagerLock.Resource.SNAPSHOT_LOCK)) {
       return new String[]{UUID.randomUUID().toString(),
           UUID.randomUUID().toString(), UUID.randomUUID().toString()};
     } else {
@@ -283,13 +284,17 @@ public class TestOzoneManagerLock {
         resource == OzoneManagerLock.Resource.BUCKET_LOCK) {
       return OzoneManagerLockUtil.generateBucketLockName(resources[0],
           resources[1]);
+    } else if (resources.length == 3 &&
+        resource == OzoneManagerLock.Resource.SNAPSHOT_LOCK) {
+      return OzoneManagerLockUtil.generateSnapshotLockName(resources[0],
+          resources[1], resources[2]);
     } else if (resources.length == 3 &&
         resource == OzoneManagerLock.Resource.KEY_PATH_LOCK) {
       return OzoneManagerLockUtil.generateKeyPathLockName(resources[0],
           resources[1], resources[2]);
     } else {
       throw new IllegalArgumentException("acquire lock is supported on single" +
-          " resource for all locks except for resource bucket");
+          " resource for all locks except for resource bucket/snapshot");
     }
   }
 
diff --git a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
index 842c9834cb..d46dda9ff3 100644
--- a/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
+++ b/hadoop-ozone/interface-client/src/main/proto/OmClientProtocol.proto
@@ -122,6 +122,7 @@ enum Type {
 
   SetRangerServiceVersion = 107;
   RangerBGSync = 109;
+  CreateSnapshot = 110;
 }
 
 message OMRequest {
@@ -226,6 +227,7 @@ message OMRequest {
 
   optional SetRangerServiceVersionRequest   SetRangerServiceVersionRequest = 107;
   optional RangerBGSyncRequest              RangerBGSyncRequest            = 109;
+  optional CreateSnapshotRequest            CreateSnapshotRequest          = 110;
 }
 
 message OMResponse {
@@ -323,6 +325,7 @@ message OMResponse {
 
   optional SetRangerServiceVersionResponse   SetRangerServiceVersionResponse = 107;
   optional RangerBGSyncResponse              RangerBGSyncResponse          = 109;
+  optional CreateSnapshotResponse            CreateSnapshotResponse        = 110;
 }
 
 enum Status {
@@ -433,6 +436,8 @@ enum Status {
     TENANT_NOT_EMPTY = 85;
 
     FEATURE_NOT_ENABLED = 86;
+
+    INVALID_SNAPSHOT_ERROR = 87;
 }
 
 /**
@@ -1604,6 +1609,12 @@ message SetRangerServiceVersionRequest {
     required uint64 rangerServiceVersion = 1;
 }
 
+message CreateSnapshotRequest {
+  required string volumeName = 1;
+  required string bucketName = 2;
+  optional string snapshotName = 3;
+}
+
 message DeleteTenantRequest {
     optional string tenantId = 1;
 }
@@ -1641,6 +1652,10 @@ message CreateTenantResponse {
 message SetRangerServiceVersionResponse {
 }
 
+message CreateSnapshotResponse {
+  required SnapshotInfo snapshotInfo = 1;
+}
+
 message DeleteTenantResponse {
     optional string volumeName = 1;
     optional int64 volRefCount = 2;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index b0ad24f5fe..2884f44cf0 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -68,6 +68,7 @@ public class OMMetrics {
   private @Metric MutableCounterLong numBucketS3Lists;
   private @Metric MutableCounterLong numInitiateMultipartUploads;
   private @Metric MutableCounterLong numCompleteMultipartUploads;
+  private @Metric MutableCounterLong numSnapshotCreates;
 
   private @Metric MutableCounterLong numGetFileStatus;
   private @Metric MutableCounterLong numCreateDirectory;
@@ -115,6 +116,7 @@ public class OMMetrics {
   private @Metric MutableCounterLong numListMultipartUploadParts;
   private @Metric MutableCounterLong numListMultipartUploadPartFails;
   private @Metric MutableCounterLong numOpenKeyDeleteRequestFails;
+  private @Metric MutableCounterLong numSnapshotCreateFails;
 
   // Number of tenant operations attempted
   private @Metric MutableCounterLong numTenantOps;
@@ -419,6 +421,15 @@ public class OMMetrics {
     numCompleteMultipartUploads.incr();
   }
 
+  public void incNumSnapshotCreates() {
+    numSnapshotCreates.incr();
+  }
+
+  public void incNumSnapshotCreateFails() {
+    numSnapshotCreateFails.incr();
+  }
+
+
   public void incNumCompleteMultipartUploadFails() {
     numCompleteMultipartUploadFails.incr();
   }
@@ -1077,6 +1088,15 @@ public class OMMetrics {
     return numTenantTenantUserLists.value();
   }
 
+  public long getNumSnapshotCreates() {
+    return numSnapshotCreates.value();
+  }
+
+  public long getNumSnapshotCreateFails() {
+    return numSnapshotCreateFails.value();
+  }
+
+
   public void incNumTrashRenames() {
     numTrashRenames.incr();
   }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index d26dae0f80..1b77ee82f8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -2387,7 +2387,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         remoteIp != null ? remoteIp.getHostName() : omRpcAddress.getHostName());
   }
 
-  private boolean isOwner(UserGroupInformation callerUgi, String ownerName) {
+  public boolean isOwner(UserGroupInformation callerUgi, String ownerName) {
     if (ownerName == null) {
       return false;
     }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotManager.java
new file mode 100644
index 0000000000..31af5e19eb
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/SnapshotManager.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om;
+
+import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
+import org.apache.hadoop.hdds.utils.db.RDBStore;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+
+import java.io.IOException;
+
+/**
+ * This class is used to manage/create OM snapshots.
+ */
+public final class SnapshotManager {
+
+  
+  /**
+   * Creates snapshot checkpoint that corresponds with SnapshotInfo.
+   * @param OMMetadataManager the metadata manager
+   * @param snapshotInfo The metadata of snapshot to be created
+   * @return instance of DBCheckpoint
+   */
+  public static DBCheckpoint createSnapshot(
+      OMMetadataManager omMetadataManager, SnapshotInfo snapshotInfo)
+      throws IOException {
+    RDBStore store = (RDBStore) omMetadataManager.getStore();
+    return store.getSnapshot(snapshotInfo.getCheckpointDirName());
+  }
+
+  private SnapshotManager() { }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
index cd52bd7143..ad730cd5bc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/utils/OzoneManagerRatisUtils.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.ozone.om.request.s3.tenant.OMTenantRevokeUserAccessIdRe
 import org.apache.hadoop.ozone.om.request.security.OMCancelDelegationTokenRequest;
 import org.apache.hadoop.ozone.om.request.security.OMGetDelegationTokenRequest;
 import org.apache.hadoop.ozone.om.request.security.OMRenewDelegationTokenRequest;
+import org.apache.hadoop.ozone.om.request.snapshot.OMSnapshotCreateRequest;
 import org.apache.hadoop.ozone.om.request.upgrade.OMCancelPrepareRequest;
 import org.apache.hadoop.ozone.om.request.upgrade.OMFinalizeUpgradeRequest;
 import org.apache.hadoop.ozone.om.request.upgrade.OMPrepareRequest;
@@ -209,6 +210,8 @@ public final class OzoneManagerRatisUtils {
       return new OMTenantRevokeAdminRequest(omRequest);
     case SetRangerServiceVersion:
       return new OMSetRangerServiceVersionRequest(omRequest);
+    case CreateSnapshot:
+      return new OMSnapshotCreateRequest(omRequest);
     case DeleteOpenKeys:
       BucketLayout bktLayout = BucketLayout.DEFAULT;
       if (omRequest.getDeleteOpenKeysRequest().hasBucketLayout()) {
@@ -216,7 +219,6 @@ public final class OzoneManagerRatisUtils {
             omRequest.getDeleteOpenKeysRequest().getBucketLayout());
       }
       return new OMOpenKeysDeleteRequest(omRequest, bktLayout);
-
     /*
      * Key requests that can have multiple variants based on the bucket layout
      * should be created using {@link BucketLayoutAwareOMKeyRequestFactory}.
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
new file mode 100644
index 0000000000..7efa4ab7f9
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/OMSnapshotCreateRequest.java
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.om.request.snapshot;
+
+import com.google.common.base.Optional;
+import org.apache.hadoop.hdds.utils.db.cache.CacheKey;
+import org.apache.hadoop.hdds.utils.db.cache.CacheValue;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.OMAction;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.request.OMClientRequest;
+import org.apache.hadoop.ozone.om.request.util.OmResponseUtil;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.om.response.snapshot.OMSnapshotCreateResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateSnapshotRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.CreateSnapshotResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
+import org.apache.hadoop.ozone.security.acl.OzoneObj;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.FILE_ALREADY_EXISTS;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.BUCKET_LOCK;
+import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.Resource.SNAPSHOT_LOCK;
+
+
+/**
+ * Handles CreateSnapshot Request.
+ */
+public class OMSnapshotCreateRequest extends OMClientRequest {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OMSnapshotCreateRequest.class);
+
+  private final String snapshotPath;
+  private final String volumeName;
+  private final String bucketName;
+  private final String snapshotName;
+  private final SnapshotInfo snapshotInfo;
+
+  public OMSnapshotCreateRequest(OMRequest omRequest) {
+    super(omRequest);
+    CreateSnapshotRequest createSnapshotRequest = omRequest
+        .getCreateSnapshotRequest();
+    volumeName = createSnapshotRequest.getVolumeName();
+    bucketName = createSnapshotRequest.getBucketName();
+    String possibleName = createSnapshotRequest.getSnapshotName();
+    snapshotInfo =
+        SnapshotInfo.newInstance(volumeName, bucketName, possibleName);
+    snapshotName = snapshotInfo.getName();
+    snapshotPath = snapshotInfo.getSnapshotPath();
+  }
+
+  @Override
+  public OMRequest preExecute(OzoneManager ozoneManager) throws IOException {
+    final OMRequest omRequest = super.preExecute(ozoneManager);
+    // Verify name
+    OmUtils.validateSnapshotName(snapshotName);
+
+    UserGroupInformation ugi = createUGI();
+    String bucketOwner = ozoneManager.getBucketOwner(volumeName, bucketName,
+        IAccessAuthorizer.ACLType.READ, OzoneObj.ResourceType.BUCKET);
+    if (!ozoneManager.isAdmin(ugi) &&
+        !ozoneManager.isOwner(ugi, bucketOwner)) {
+      throw new OMException(
+          "Only bucket owners/admins can create snapshots",
+          OMException.ResultCodes.PERMISSION_DENIED);
+    }
+    return omRequest;
+  }
+  
+  @Override
+  public OMClientResponse validateAndUpdateCache(OzoneManager ozoneManager,
+      long transactionLogIndex,
+      OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper) {
+
+    OMMetrics omMetrics = ozoneManager.getMetrics();
+    omMetrics.incNumSnapshotCreates();
+
+    boolean acquiredBucketLock = false, acquiredSnapshotLock = false;
+    IOException exception = null;
+    OMMetadataManager omMetadataManager = ozoneManager.getMetadataManager();
+    
+    OMResponse.Builder omResponse = OmResponseUtil.getOMResponseBuilder(
+        getOmRequest());
+    OMClientResponse omClientResponse = null;
+    AuditLogger auditLogger = ozoneManager.getAuditLogger();
+
+    OzoneManagerProtocolProtos.UserInfo userInfo = getOmRequest().getUserInfo();
+    String key = snapshotInfo.getTableKey();
+    try {
+      // Lock bucket so it doesn't
+      //  get deleted while creating snapshot
+      acquiredBucketLock =
+          omMetadataManager.getLock().acquireReadLock(BUCKET_LOCK,
+              volumeName, bucketName);
+
+      acquiredSnapshotLock =
+          omMetadataManager.getLock().acquireWriteLock(SNAPSHOT_LOCK,
+              volumeName, bucketName, snapshotName);
+
+      //Check if snapshot already exists
+      if (omMetadataManager.getSnapshotInfoTable().isExist(key)) {
+        LOG.debug("snapshot: {} already exists ", key);
+        throw new OMException("Snapshot already exists", FILE_ALREADY_EXISTS);
+      }
+
+      omMetadataManager.getSnapshotInfoTable()
+          .addCacheEntry(new CacheKey<>(key),
+            new CacheValue<>(Optional.of(snapshotInfo), transactionLogIndex));
+
+      omResponse.setCreateSnapshotResponse(
+          CreateSnapshotResponse.newBuilder()
+          .setSnapshotInfo(snapshotInfo.getProtobuf()));
+      omClientResponse = new OMSnapshotCreateResponse(
+          omResponse.build(), volumeName, bucketName, snapshotName);
+    } catch (IOException ex) {
+      exception = ex;
+      omClientResponse = new OMSnapshotCreateResponse(
+          createErrorOMResponse(omResponse, exception));
+    } finally {
+      addResponseToDoubleBuffer(transactionLogIndex, omClientResponse,
+          ozoneManagerDoubleBufferHelper);
+      if (acquiredSnapshotLock) {
+        omMetadataManager.getLock().releaseWriteLock(SNAPSHOT_LOCK, volumeName,
+            bucketName, snapshotName);
+      }
+      if (acquiredBucketLock) {
+        omMetadataManager.getLock().releaseReadLock(BUCKET_LOCK, volumeName,
+            bucketName);
+      }
+    }
+
+    // Performing audit logging outside the lock.
+    auditLog(auditLogger, buildAuditMessage(OMAction.CREATE_SNAPSHOT,
+        snapshotInfo.toAuditMap(), exception, userInfo));
+    
+    if (exception == null) {
+      LOG.info("created snapshot: name {} in snapshotPath: {}", snapshotName,
+          snapshotPath);
+    } else {
+      omMetrics.incNumSnapshotCreateFails();
+      LOG.error("Snapshot creation failed for name:{} in snapshotPath:{}",
+          snapshotName, snapshotPath);
+    }
+    return omClientResponse;
+  }
+  
+}
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/package-info.java
new file mode 100644
index 0000000000..f1edc13337
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/request/snapshot/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Package contains classes related to snapshot requests.
+ */
+package org.apache.hadoop.ozone.om.request.snapshot;
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotCreateResponse.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotCreateResponse.java
new file mode 100644
index 0000000000..5809182fef
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/OMSnapshotCreateResponse.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.om.response.snapshot;
+
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.SnapshotManager;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.response.CleanupTableInfo;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+import static org.apache.hadoop.ozone.om.OmMetadataManagerImpl.SNAPSHOT_INFO_TABLE;
+
+/**
+ * Response for OMSnapshotCreateResponse.
+ */
+@CleanupTableInfo(cleanupTables = {SNAPSHOT_INFO_TABLE})
+public class OMSnapshotCreateResponse extends OMClientResponse {
+
+  @SuppressWarnings("checkstyle:parameternumber")
+  public OMSnapshotCreateResponse(@Nonnull OMResponse omResponse,
+      @Nonnull String volumeName, @Nonnull String bucketName,
+                                  @Nonnull String snapshotName) {
+    super(omResponse);
+  }
+
+  /**
+   * For when the request is not successful.
+   * For a successful request, the other constructor should be used.
+   */
+  public OMSnapshotCreateResponse(@Nonnull OMResponse omResponse) {
+    super(omResponse);
+    checkStatusNotOK();
+  }
+
+  @Override
+  public void addToDBBatch(OMMetadataManager omMetadataManager,
+      BatchOperation batchOperation) throws IOException {
+
+    SnapshotInfo snapshotInfo =
+        SnapshotInfo.getFromProtobuf(
+        getOMResponse().getCreateSnapshotResponse().getSnapshotInfo());
+
+    // Create the snapshot checkpoint
+    SnapshotManager.createSnapshot(omMetadataManager, snapshotInfo);
+
+    String key = snapshotInfo.getTableKey();
+
+    // Add to db
+    omMetadataManager.getSnapshotInfoTable().putWithBatch(batchOperation,
+        key, snapshotInfo);
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/package-info.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/package-info.java
new file mode 100644
index 0000000000..c9058ab2a1
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/response/snapshot/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Package contains classes related to OM snapshot responses.
+ */
+package org.apache.hadoop.ozone.om.response.snapshot;
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
index 2679634f9f..970ebe26b4 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/OMRequestTestUtils.java
@@ -1061,6 +1061,21 @@ public final class OMRequestTestUtils {
         .build();
   }
 
+  /**
+   * Create OMRequest for Create Snapshot.
+   * @param name
+   * @param snapshotPath
+   */
+  public static OMRequest createSnapshotRequest(String volumeName,
+      String bucketName, String snapshotName) {
+    return OMRequest.newBuilder().setCreateSnapshotRequest(
+            OzoneManagerProtocolProtos.CreateSnapshotRequest.newBuilder()
+                .setVolumeName(volumeName).setBucketName(bucketName)
+                .setSnapshotName(snapshotName))
+        .setCmdType(OzoneManagerProtocolProtos.Type.CreateSnapshot)
+        .setClientId(UUID.randomUUID().toString()).build();
+  }
+
   /**
    * Add the Key information to OzoneManager DB and cache.
    * @param omMetadataManager
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
new file mode 100644
index 0000000000..37baf0ccf7
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/TestOMSnapshotCreateRequest.java
@@ -0,0 +1,235 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.request.snapshot;
+
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.audit.AuditLogger;
+import org.apache.hadoop.ozone.audit.AuditMessage;
+
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OMMetrics;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerDoubleBufferHelper;
+import org.apache.hadoop.ozone.om.upgrade.OMLayoutVersionManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ozone.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.ozone.om.request.OMRequestTestUtils;
+import org.apache.hadoop.ozone.om.response.OMClientResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests OMSnapshotCreateRequest class, which handles CreateSnapshot request.
+ */
+public class TestOMSnapshotCreateRequest {
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  private OzoneManager ozoneManager;
+  private OMMetrics omMetrics;
+  private OMMetadataManager omMetadataManager;
+
+  private String volumeName;
+  private String bucketName;
+  private String snapshotName;
+
+  // Just setting ozoneManagerDoubleBuffer which does nothing.
+  private final OzoneManagerDoubleBufferHelper ozoneManagerDoubleBufferHelper =
+      ((response, transactionIndex) -> null);
+
+  @Before
+  public void setup() throws Exception {
+
+    ozoneManager = mock(OzoneManager.class);
+    omMetrics = OMMetrics.create();
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        folder.newFolder().getAbsolutePath());
+    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
+    when(ozoneManager.getMetrics()).thenReturn(omMetrics);
+    when(ozoneManager.getMetadataManager()).thenReturn(omMetadataManager);
+    when(ozoneManager.isRatisEnabled()).thenReturn(true);
+    when(ozoneManager.isAdmin((UserGroupInformation) any())).thenReturn(false);
+    when(ozoneManager.isOwner(any(), any())).thenReturn(false);
+    when(ozoneManager.getBucketOwner(any(), any(),
+        any(), any())).thenReturn("dummyBucketOwner");
+    OMLayoutVersionManager lvm = mock(OMLayoutVersionManager.class);
+    when(lvm.getMetadataLayoutVersion()).thenReturn(0);
+    when(ozoneManager.getVersionManager()).thenReturn(lvm);
+    AuditLogger auditLogger = mock(AuditLogger.class);
+    when(ozoneManager.getAuditLogger()).thenReturn(auditLogger);
+    Mockito.doNothing().when(auditLogger).logWrite(any(AuditMessage.class));
+
+    volumeName = UUID.randomUUID().toString();
+    bucketName = UUID.randomUUID().toString();
+    snapshotName = UUID.randomUUID().toString();
+    OMRequestTestUtils.addVolumeAndBucketToDB(
+        volumeName, bucketName, omMetadataManager);
+
+  }
+
+  @After
+  public void stop() {
+    omMetrics.unRegister();
+    Mockito.framework().clearInlineMocks();
+  }
+
+  @Test
+  public void testPreExecute() throws Exception {
+    // set the owner
+    when(ozoneManager.isOwner(any(), any())).thenReturn(true);
+    OMRequest omRequest =
+        OMRequestTestUtils.createSnapshotRequest(
+        volumeName, bucketName, snapshotName);
+    // should not throw
+    doPreExecute(omRequest);
+  }
+
+  @Test
+  public void testPreExecuteBadOwner() throws Exception {
+    // owner not set
+    OMRequest omRequest =
+        OMRequestTestUtils.createSnapshotRequest(
+        volumeName, bucketName, snapshotName);
+    // Check bad owner
+    LambdaTestUtils.intercept(OMException.class,
+        "Only bucket owners/admins can create snapshots",
+        () -> doPreExecute(omRequest));
+  }
+
+  @Test
+  public void testPreExecuteBadName() throws Exception {
+    // check invalid snapshot name
+    String badName = "a?b";
+    OMRequest omRequest =
+        OMRequestTestUtils.createSnapshotRequest(
+        volumeName, bucketName, badName);
+    LambdaTestUtils.intercept(OMException.class,
+        "Invalid snapshot name: " + badName,
+        () -> doPreExecute(omRequest));
+  }
+  
+  @Test
+  public void testValidateAndUpdateCache() throws Exception {
+    when(ozoneManager.isAdmin((UserGroupInformation) any())).thenReturn(true);
+    OMRequest omRequest =
+        OMRequestTestUtils.createSnapshotRequest(
+        volumeName, bucketName, snapshotName);
+    OMSnapshotCreateRequest omSnapshotCreateRequest =
+        doPreExecute(omRequest);
+    String key = SnapshotInfo.newInstance(volumeName,
+        bucketName, snapshotName).getTableKey();
+
+    // As we have not still called validateAndUpdateCache, get() should
+    // return null.
+    Assert.assertNull(omMetadataManager.getSnapshotInfoTable().get(key));
+
+    // add key to cache
+    OMClientResponse omClientResponse =
+        omSnapshotCreateRequest.validateAndUpdateCache(ozoneManager, 1,
+            ozoneManagerDoubleBufferHelper);
+    
+    // check cache
+    SnapshotInfo snapshotInfo =
+        omMetadataManager.getSnapshotInfoTable().get(key);
+    Assert.assertNotNull(snapshotInfo);
+
+    // verify table data with response data.
+    SnapshotInfo snapshotInfoFromProto = SnapshotInfo.getFromProtobuf(
+        omClientResponse.getOMResponse()
+        .getCreateSnapshotResponse().getSnapshotInfo());
+    Assert.assertEquals(snapshotInfoFromProto, snapshotInfo);
+
+    OMResponse omResponse = omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getCreateSnapshotResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Type.CreateSnapshot,
+        omResponse.getCmdType());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.OK,
+        omResponse.getStatus());
+  }
+
+  @Test
+  public void testEntryExists() throws Exception {
+    when(ozoneManager.isAdmin((UserGroupInformation) any())).thenReturn(true);
+    OMRequest omRequest =
+        OMRequestTestUtils.createSnapshotRequest(
+        volumeName, bucketName, snapshotName);
+    OMSnapshotCreateRequest omSnapshotCreateRequest = doPreExecute(omRequest);
+    String key = SnapshotInfo.newInstance(volumeName,
+        bucketName, snapshotName).getTableKey();
+
+    Assert.assertNull(omMetadataManager.getSnapshotInfoTable().get(key));
+
+    //create entry
+    omSnapshotCreateRequest.validateAndUpdateCache(ozoneManager, 1,
+        ozoneManagerDoubleBufferHelper);
+    SnapshotInfo snapshotInfo =
+        omMetadataManager.getSnapshotInfoTable().get(key);
+    Assert.assertNotNull(snapshotInfo);
+
+    // Now try to create again to verify error
+    omRequest =
+        OMRequestTestUtils.createSnapshotRequest(
+        volumeName, bucketName, snapshotName);
+    omSnapshotCreateRequest = doPreExecute(omRequest);
+    OMClientResponse omClientResponse =
+        omSnapshotCreateRequest.validateAndUpdateCache(ozoneManager, 2,
+            ozoneManagerDoubleBufferHelper);
+    
+    OMResponse omResponse = omClientResponse.getOMResponse();
+    Assert.assertNotNull(omResponse.getCreateSnapshotResponse());
+    Assert.assertEquals(OzoneManagerProtocolProtos.Status.FILE_ALREADY_EXISTS,
+        omResponse.getStatus());
+  }
+
+  private OMSnapshotCreateRequest doPreExecute(
+      OMRequest originalRequest) throws Exception {
+    OMSnapshotCreateRequest omSnapshotCreateRequest =
+        new OMSnapshotCreateRequest(originalRequest);
+
+    OMRequest modifiedRequest =
+        omSnapshotCreateRequest.preExecute(ozoneManager);
+    return new OMSnapshotCreateRequest(modifiedRequest);
+  }
+
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/package-info.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/package-info.java
new file mode 100644
index 0000000000..d20b493568
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/request/snapshot/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Package contains test classes for snapshot requests.
+ */
+package org.apache.hadoop.ozone.om.request.snapshot;
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
new file mode 100644
index 0000000000..755fed158b
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/TestOMSnapshotCreateResponse.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.ozone.om.response.snapshot;
+
+import java.io.File;
+import java.util.UUID;
+
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.om.helpers.SnapshotInfo;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .CreateSnapshotResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
+import org.apache.hadoop.hdds.utils.db.BatchOperation;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_DB_NAME;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_KEY_PREFIX;
+import static org.apache.hadoop.ozone.OzoneConsts.OM_SNAPSHOT_DIR;
+
+
+/**
+ * This class tests OMSnapshotCreateResponse.
+ */
+public class TestOMSnapshotCreateResponse {
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+  
+  private OMMetadataManager omMetadataManager;
+  private BatchOperation batchOperation;
+  private String fsPath;
+  @Before
+  public void setup() throws Exception {
+    OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
+    fsPath = folder.newFolder().getAbsolutePath();
+    ozoneConfiguration.set(OMConfigKeys.OZONE_OM_DB_DIRS,
+        fsPath);
+    omMetadataManager = new OmMetadataManagerImpl(ozoneConfiguration);
+    batchOperation = omMetadataManager.getStore().initBatchOperation();
+  }
+
+  @After
+  public void tearDown() {
+    if (batchOperation != null) {
+      batchOperation.close();
+    }
+  }
+
+  @Test
+  public void testAddToDBBatch() throws Exception {
+    String volumeName = UUID.randomUUID().toString();
+    String bucketName = UUID.randomUUID().toString();
+    String snapshotName = UUID.randomUUID().toString();
+    SnapshotInfo snapshotInfo =
+        SnapshotInfo.newInstance(volumeName, bucketName, snapshotName);
+
+    // confirm table is empty
+    Assert.assertEquals(0,
+        omMetadataManager
+        .countRowsInTable(omMetadataManager.getSnapshotInfoTable()));
+
+    // commit to table
+    OMSnapshotCreateResponse omSnapshotCreateResponse =
+        new OMSnapshotCreateResponse(OMResponse.newBuilder()
+            .setCmdType(OzoneManagerProtocolProtos.Type.CreateSnapshot)
+            .setStatus(OzoneManagerProtocolProtos.Status.OK)
+            .setCreateSnapshotResponse(
+                CreateSnapshotResponse.newBuilder()
+                .setSnapshotInfo(snapshotInfo.getProtobuf())
+                .build()).build(), volumeName, bucketName, snapshotName);
+    omSnapshotCreateResponse.addToDBBatch(omMetadataManager, batchOperation);
+    omMetadataManager.getStore().commitBatchOperation(batchOperation);
+
+    // Confirm snapshot directory was created
+    String snapshotDir = fsPath + OM_KEY_PREFIX +
+        OM_SNAPSHOT_DIR + OM_KEY_PREFIX + OM_DB_NAME +
+        snapshotInfo.getCheckpointDirName();
+    Assert.assertTrue((new File(snapshotDir)).exists());
+
+    // Confirm table has 1 entry
+    Assert.assertEquals(1, omMetadataManager
+        .countRowsInTable(omMetadataManager.getSnapshotInfoTable()));
+
+    // Check contents of entry
+    Table.KeyValue<String, SnapshotInfo> keyValue =
+        omMetadataManager.getSnapshotInfoTable().iterator().next();
+    SnapshotInfo storedInfo = keyValue.getValue();
+    Assert.assertEquals(snapshotInfo.getTableKey(), keyValue.getKey());
+    Assert.assertEquals(snapshotInfo, storedInfo);
+  }
+}
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/package-info.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/package-info.java
new file mode 100644
index 0000000000..b8c34e18a5
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/response/snapshot/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+/**
+ * Package contains test classes for snapshot responses.
+ */
+package org.apache.hadoop.ozone.om.response.snapshot;
\ No newline at end of file
diff --git a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
index f023488986..3cab2072e2 100644
--- a/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
+++ b/hadoop-ozone/s3gateway/src/test/java/org/apache/hadoop/ozone/client/ClientProtocolStub.java
@@ -570,4 +570,11 @@ public class ClientProtocolStub implements ClientProtocol {
       String volumeName, String bucketName, String keyName) throws IOException {
     return null;
   }
+
+  @Override
+  public String createSnapshot(String volumeName,
+      String bucketName, String snapshotName)
+      throws IOException {
+    return "";
+  }
 }
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/CreateSnapshotHandler.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/CreateSnapshotHandler.java
new file mode 100644
index 0000000000..c92ffec40e
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/CreateSnapshotHandler.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.shell.snapshot;
+
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.shell.Handler;
+import org.apache.hadoop.ozone.shell.OzoneAddress;
+import org.apache.hadoop.ozone.shell.bucket.BucketUri;
+import picocli.CommandLine;
+
+import java.io.IOException;
+
+/**
+ * ozone snapshot create.
+ */
+@CommandLine.Command(name = "create",
+    description = "create snapshot")
+public class CreateSnapshotHandler extends Handler {
+
+  @CommandLine.Mixin
+  private BucketUri snapshotPath;
+
+
+  @CommandLine.Parameters(description = "optional snapshot name",
+      index = "1", arity = "0..1")
+  private String snapshotName;
+
+  @Override
+  protected OzoneAddress getAddress() {
+    return snapshotPath.getValue();
+  }
+
+  @Override
+  protected void execute(OzoneClient client, OzoneAddress address)
+      throws IOException {
+
+    String volumeName = snapshotPath.getValue().getVolumeName();
+    String bucketName = snapshotPath.getValue().getBucketName();
+    OmUtils.validateSnapshotName(snapshotName);
+    String newName = client.getObjectStore()
+        .createSnapshot(volumeName, bucketName, snapshotName);
+    if (isVerbose()) {
+      out().format("created snapshot '%s/%s %s'.%n", volumeName, bucketName,
+          newName);
+    }
+  }
+}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/SnapshotCommands.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/SnapshotCommands.java
new file mode 100644
index 0000000000..fdb3eff2ba
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/SnapshotCommands.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.shell.snapshot;
+
+import java.util.concurrent.Callable;
+
+import org.apache.hadoop.hdds.cli.GenericParentCommand;
+import org.apache.hadoop.hdds.cli.HddsVersionProvider;
+import org.apache.hadoop.hdds.cli.MissingSubcommandException;
+import org.apache.hadoop.hdds.cli.SubcommandWithParent;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.shell.OzoneShell;
+import org.apache.hadoop.ozone.shell.Shell;
+
+import org.kohsuke.MetaInfServices;
+import picocli.CommandLine.Command;
+import picocli.CommandLine.ParentCommand;
+
+/**
+ * Subcommands for the snapshot related operations.
+ */
+@Command(name = "snapshot",
+    description = "Snapshot specific operations",
+    subcommands = {
+        CreateSnapshotHandler.class,
+    },
+    mixinStandardHelpOptions = true,
+    versionProvider = HddsVersionProvider.class)
+@MetaInfServices(SubcommandWithParent.class)
+public class SnapshotCommands implements GenericParentCommand, Callable<Void>,
+    SubcommandWithParent {
+
+  @ParentCommand
+  private Shell shell;
+
+  @Override
+  public Void call() throws Exception {
+    throw new MissingSubcommandException(
+        this.shell.getCmd().getSubcommands().get("snapshot"));
+  }
+
+  @Override
+  public boolean isVerbose() {
+    return shell.isVerbose();
+  }
+
+  @Override
+  public OzoneConfiguration createOzoneConfiguration() {
+    return shell.createOzoneConfiguration();
+  }
+
+  @Override
+  public Class<?> getParentType() {
+    return OzoneShell.class;
+  }
+}
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/package-info.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/package-info.java
new file mode 100644
index 0000000000..eb136167e6
--- /dev/null
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/shell/snapshot/package-info.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Commands for Ozone buckets.
+ */
+package org.apache.hadoop.ozone.shell.snapshot;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org