You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2023/01/26 17:12:44 UTC

[zookeeper] branch master updated: ZOOKEEPER-4571: Admin server API for restore database from a snapshot (#1961)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new d35bdfb9d ZOOKEEPER-4571: Admin server API for restore database from a snapshot (#1961)
d35bdfb9d is described below

commit d35bdfb9d3a4bc3d94f22785604418d4f650365d
Author: li4wang <68...@users.noreply.github.com>
AuthorDate: Thu Jan 26 09:12:14 2023 -0800

    ZOOKEEPER-4571: Admin server API for restore database from a snapshot (#1961)
    
    Provides a restore command for restoring database from a snapshot
    
    Author: Li Wang <li...@apple.com>
    
    Co-authored-by: liwang <li...@apple.com>
---
 .../src/main/resources/markdown/zookeeperAdmin.md  |  23 ++-
 .../org/apache/zookeeper/server/ServerMetrics.java |  18 ++
 .../org/apache/zookeeper/server/ZKDatabase.java    |  40 ++++
 .../apache/zookeeper/server/ZooKeeperServer.java   |  82 +++++++-
 .../org/apache/zookeeper/server/admin/Command.java |  26 ++-
 .../apache/zookeeper/server/admin/Commands.java    | 222 +++++++++++++++------
 .../apache/zookeeper/server/admin/GetCommand.java  |  43 ++++
 .../zookeeper/server/admin/JettyAdminServer.java   |  57 +++++-
 .../apache/zookeeper/server/admin/PostCommand.java |  37 ++++
 .../zookeeper/server/persistence/FileSnap.java     |   2 +-
 .../zookeeper/server/persistence/SnapStream.java   |   2 +-
 .../test/java/org/apache/zookeeper/ZKTestCase.java |   4 +-
 .../server/ZookeeperServerRestoreTest.java         | 141 +++++++++++++
 .../ZookeeperServerSnapshotTest.java}              |  55 +++--
 .../zookeeper/server/admin/CommandsTest.java       |  85 ++++++--
 .../server/admin/JettyAdminServerTest.java         |   2 +-
 ...est.java => SnapshotAndRestoreCommandTest.java} | 212 +++++++++++++++++---
 .../apache/zookeeper/test/ObserverMasterTest.java  |   2 +-
 18 files changed, 891 insertions(+), 162 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index b4c01d35d..636c6cde2 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -2115,16 +2115,22 @@ Both subsystems need to have sufficient amount of threads to achieve peak read t
 **New in 3.9.0:** The following
 options are used to configure the [AdminServer](#sc_adminserver).
 
+* *admin.rateLimiterIntervalInMS* :
+  (Java system property: **zookeeper.admin.rateLimiterIntervalInMS**)
+  The time interval for rate limiting admin command to protect the server.
+  Defaults to 5 mins.
+
 * *admin.snapshot.enabled* :
   (Java system property: **zookeeper.admin.snapshot.enabled**)
   The flag for enabling the snapshot command. Defaults to false. 
   It will be enabled by default once the auth support for admin server commands 
   is available.
-
-* *admin.snapshot.intervalInMS* :
-  (Java system property: **zookeeper.admin.snapshot.intervalInMS**)
-  The time interval for rate limiting snapshot command to protect the server.
-  Defaults to 5 mins.
+  
+* *admin.restore.enabled* :
+  (Java system property: **zookeeper.admin.restore.enabled**)
+  The flag for enabling the restore command. Defaults to false.
+  It will be enabled by default once the auth support for admin server commands
+  is available.
 
 **New in 3.7.1:** The following
 options are used to configure the [AdminServer](#sc_adminserver).
@@ -2641,6 +2647,13 @@ Available commands include:
     Reset all observer connection statistics. Companion command to *observers*.
     No new fields returned.
 
+* *restore/rest* :
+  Restore database from snapshot input stream on the current server.
+  Returns the following data in response payload:
+  "last_zxid": String
+  Note: this API is rate-limited (once every 5 mins by default) to protect the server
+  from being over-loaded.  
+
 * *ruok* :
     No-op command, check if the server is running.
     A response does not necessarily indicate that the
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
index ef28e32e1..e3476ff6d 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java
@@ -74,6 +74,9 @@ public final class ServerMetrics {
         SNAPSHOT_TIME = metricsContext.getSummary("snapshottime", DetailLevel.BASIC);
         SNAPSHOT_ERROR_COUNT = metricsContext.getCounter("snapshot_error_count");
         SNAPSHOT_RATE_LIMITED_COUNT = metricsContext.getCounter("snapshot_rate_limited_count");
+        RESTORE_TIME = metricsContext.getSummary("restore_time", DetailLevel.BASIC);
+        RESTORE_ERROR_COUNT = metricsContext.getCounter("restore_error_count");
+        RESTORE_RATE_LIMITED_COUNT = metricsContext.getCounter("restore_rate_limited_count");
         DB_INIT_TIME = metricsContext.getSummary("dbinittime", DetailLevel.BASIC);
         READ_LATENCY = metricsContext.getSummary("readlatency", DetailLevel.ADVANCED);
         UPDATE_LATENCY = metricsContext.getSummary("updatelatency", DetailLevel.ADVANCED);
@@ -288,6 +291,21 @@ public final class ServerMetrics {
      */
     public final Counter SNAPSHOT_RATE_LIMITED_COUNT;
 
+    /**
+     * Restore time
+     */
+    public final Summary RESTORE_TIME;
+
+    /**
+     * Restore error count
+     */
+    public final Counter RESTORE_ERROR_COUNT;
+
+    /**
+     * Restore rate limited count
+     */
+    public final Counter RESTORE_RATE_LIMITED_COUNT;
+
     /**
      * Db init time (snapshot loading + txnlog replay)
      */
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
index 05ada8cee..f20af88d5 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZKDatabase.java
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import java.util.zip.CheckedInputStream;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.Record;
@@ -48,8 +49,10 @@ import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
+import org.apache.zookeeper.server.persistence.FileSnap;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
+import org.apache.zookeeper.server.persistence.SnapStream;
 import org.apache.zookeeper.server.persistence.TxnLog.TxnIterator;
 import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
@@ -620,6 +623,43 @@ public class ZKDatabase {
         initialized = true;
     }
 
+    /**
+     * Deserialize a snapshot that contains FileHeader from an input archive. It is used by
+     * the admin restore command.
+     *
+     * @param ia the input archive to deserialize from
+     * @param is the CheckInputStream to check integrity
+     *
+     * @throws IOException
+     */
+    public void deserializeSnapshot(final InputArchive ia, final CheckedInputStream is) throws IOException {
+        clear();
+
+        // deserialize data tree
+        final DataTree dataTree = getDataTree();
+        final FileSnap filesnap = new FileSnap(snapLog.getSnapDir());
+        filesnap.deserialize(dataTree, getSessionWithTimeOuts(), ia);
+        SnapStream.checkSealIntegrity(is, ia);
+
+        // deserialize digest and check integrity
+        if (dataTree.deserializeZxidDigest(ia, 0)) {
+            SnapStream.checkSealIntegrity(is, ia);
+        }
+
+        // deserialize lastProcessedZxid and check integrity
+        if (dataTree.deserializeLastProcessedZxid(ia)) {
+            SnapStream.checkSealIntegrity(is, ia);
+        }
+
+        // compare the digest to find inconsistency
+        if (dataTree.getDigestFromLoadedSnapshot() != null) {
+            dataTree.compareSnapshotDigests(dataTree.lastProcessedZxid);
+        }
+
+        initialized = true;
+    }
+
+
     /**
      * serialize the snapshot
      * @param oa the output archive to which the snapshot needs to be serialized
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index d61f269c9..bbb0f7fee 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -19,9 +19,11 @@
 package org.apache.zookeeper.server;
 
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
+import java.io.BufferedInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.util.ArrayDeque;
@@ -34,11 +36,16 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiConsumer;
+import java.util.zip.Adler32;
+import java.util.zip.CheckedInputStream;
 import javax.security.sasl.SaslException;
+import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.InputArchive;
 import org.apache.jute.Record;
 import org.apache.zookeeper.Environment;
 import org.apache.zookeeper.KeeperException;
@@ -127,6 +134,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     // this feature is confirmed to be stable
     public static final String CLOSE_SESSION_TXN_ENABLED = "zookeeper.closeSessionTxn.enabled";
     private static boolean closeSessionTxnEnabled = true;
+    private volatile CountDownLatch restoreLatch;
 
     static {
         LOG = LoggerFactory.getLogger(ZooKeeperServer.class);
@@ -541,12 +549,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         takeSnapshot();
     }
 
-    public void takeSnapshot() throws IOException {
-        takeSnapshot(false);
+    public File takeSnapshot() throws IOException {
+        return takeSnapshot(false);
     }
 
-    public void takeSnapshot(boolean syncSnap) throws IOException {
-        takeSnapshot(syncSnap, true, false);
+    public File takeSnapshot(boolean syncSnap) throws IOException {
+        return takeSnapshot(syncSnap, true, false);
     }
 
     /**
@@ -583,6 +591,61 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return snapFile;
     }
 
+    /**
+     * Restores database from a snapshot. It is used by the restore admin server command.
+     *
+     * @param inputStream input stream of snapshot
+     * @Return last processed zxid
+     * @throws IOException
+     */
+    public synchronized long restoreFromSnapshot(final InputStream inputStream) throws IOException {
+        if (inputStream == null) {
+            throw new IllegalArgumentException("InputStream can not be null when restoring from snapshot");
+        }
+
+        long start = Time.currentElapsedTime();
+        LOG.info("Before restore database. lastProcessedZxid={}, nodeCount={},sessionCount={}",
+            getZKDatabase().getDataTreeLastProcessedZxid(),
+            getZKDatabase().dataTree.getNodeCount(),
+            getZKDatabase().getSessionCount());
+
+        // restore to a new zkDatabase
+        final ZKDatabase newZKDatabase = new ZKDatabase(this.txnLogFactory);
+        final CheckedInputStream cis = new CheckedInputStream(new BufferedInputStream(inputStream), new Adler32());
+        final InputArchive ia = BinaryInputArchive.getArchive(cis);
+        newZKDatabase.deserializeSnapshot(ia, cis);
+        LOG.info("Restored to a new database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
+            newZKDatabase.getDataTreeLastProcessedZxid(),
+            newZKDatabase.dataTree.getNodeCount(),
+            newZKDatabase.getSessionCount());
+
+        // create a CountDownLatch
+        restoreLatch = new CountDownLatch(1);
+
+        try {
+            // set to the new zkDatabase
+            setZKDatabase(newZKDatabase);
+
+            // re-create SessionTrack
+            createSessionTracker();
+        } finally {
+            // unblock request submission
+            restoreLatch.countDown();
+            restoreLatch = null;
+        }
+
+        LOG.info("After restore database. lastProcessedZxid={}, nodeCount={}, sessionCount={}",
+                getZKDatabase().getDataTreeLastProcessedZxid(),
+                getZKDatabase().dataTree.getNodeCount(),
+                getZKDatabase().getSessionCount());
+
+        long elapsed = Time.currentElapsedTime() - start;
+        LOG.info("Restore taken in {} ms", elapsed);
+        ServerMetrics.getMetrics().RESTORE_TIME.add(elapsed);
+
+        return getLastProcessedZxid();
+    }
+
     public boolean shouldForceWriteInitialSnapshotAfterLeaderElection() {
         return txnLogFactory.shouldForceWriteInitialSnapshotAfterLeaderElection();
     }
@@ -826,6 +889,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
      * <li>During shutdown the server sets the state to SHUTDOWN, which
      * corresponds to the server not running.</li></ul>
      *
+     * <li>During maintenance (e.g. restore) the server sets the state to MAINTENANCE
+     * </li></ul>
+     *
      * @param state new server state.
      */
     protected void setState(State state) {
@@ -1151,6 +1217,14 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
 
     public void submitRequest(Request si) {
+        if (restoreLatch != null) {
+            try {
+                LOG.info("Blocking request submission while restore is in progress");
+                restoreLatch.await();
+            } catch (final InterruptedException e) {
+                LOG.warn("Unexpected interruption", e);
+            }
+        }
         enqueueRequest(si);
     }
 
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java
index b422715bf..5d06356b2 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java
@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper.server.admin;
 
+import java.io.InputStream;
 import java.util.Map;
 import java.util.Set;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -57,19 +58,34 @@ public interface Command {
     boolean isServerRequired();
 
     /**
-     * Run this command. Commands take a ZooKeeperServer and String-valued
-     * keyword arguments and return a map containing any information
+     * Run this command for HTTP GET request. Commands take a ZooKeeperServer, String-valued keyword
+     * arguments and return a CommandResponse object containing any information
      * constituting the response to the command. Commands are responsible for
      * parsing keyword arguments and performing any error handling if necessary.
      * Errors should be reported by setting the "error" entry of the returned
      * map with an appropriate message rather than throwing an exception.
      *
-     * @param zkServer
+     * @param zkServer ZooKeeper server
      * @param kwargs keyword -&gt; argument value mapping
-     * @return Map representing response to command containing at minimum:
+     * @return CommandResponse representing response to command containing at minimum:
      *    - "command" key containing the command's primary name
      *    - "error" key containing a String error message or null if no error
      */
-    CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs);
+    CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs);
 
+    /**
+     * Run this command for HTTP POST. Commands take a ZooKeeperServer and InputStream and
+     * return a CommandResponse object containing any information
+     * constituting the response to the command. Commands are responsible for
+     * parsing keyword arguments and performing any error handling if necessary.
+     * Errors should be reported by setting the "error" entry of the returned
+     * map with an appropriate message rather than throwing an exception.
+     *
+     * @param zkServer ZooKeeper server
+     * @param inputStream InputStream from request
+     * @return CommandResponse representing response to command containing at minimum:
+     *    - "command" key containing the command's primary name
+     *    - "error" key containing a String error message or null if no error
+     */
+     CommandResponse runPost(ZooKeeperServer zkServer, InputStream inputStream);
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index 848583a0f..1911b5a57 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.InputStream;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collections;
@@ -73,6 +74,8 @@ import org.slf4j.LoggerFactory;
 public class Commands {
 
     static final Logger LOG = LoggerFactory.getLogger(Commands.class);
+    static final String ADMIN_RATE_LIMITER_INTERVAL = "zookeeper.admin.rateLimiterIntervalInMS";
+    private static final long rateLimiterInterval = Integer.parseInt(System.getProperty(ADMIN_RATE_LIMITER_INTERVAL, "300000"));
 
     /** Maps command names to Command instances */
     private static Map<String, Command> commands = new HashMap<String, Command>();
@@ -100,16 +103,16 @@ public class Commands {
      *
      * @param cmdName
      * @param zkServer
-     * @param kwargs String-valued keyword arguments to the command
+     * @param kwargs String-valued keyword arguments to the command from HTTP GET request
      *        (may be null if command requires no additional arguments)
      * @return Map representing response to command containing at minimum:
      *    - "command" key containing the command's primary name
      *    - "error" key containing a String error message or null if no error
      */
-    public static CommandResponse runCommand(
-        String cmdName,
-        ZooKeeperServer zkServer,
-        Map<String, String> kwargs) {
+    public static CommandResponse runGetCommand(
+            String cmdName,
+            ZooKeeperServer zkServer,
+            Map<String, String> kwargs) {
         Command command = getCommand(cmdName);
         if (command == null) {
             // set the status code to 200 to keep the current behavior of existing commands
@@ -119,7 +122,36 @@ public class Commands {
             // set the status code to 200 to keep the current behavior of existing commands
             return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK);
         }
-        return command.run(zkServer, kwargs);
+        return command.runGet(zkServer, kwargs);
+    }
+
+    /**
+     * Run the registered command with name cmdName. Commands should not produce
+     * any exceptions; any (anticipated) errors should be reported in the
+     * "error" entry of the returned map. Likewise, if no command with the given
+     * name is registered, this will be noted in the "error" entry.
+     *
+     * @param cmdName
+     * @param zkServer
+     * @param inputStream InputStream from HTTP POST request
+     * @return Map representing response to command containing at minimum:
+     *    - "command" key containing the command's primary name
+     *    - "error" key containing a String error message or null if no error
+     */
+    public static CommandResponse runPostCommand(
+            String cmdName,
+            ZooKeeperServer zkServer,
+            InputStream inputStream) {
+        Command command = getCommand(cmdName);
+        if (command == null) {
+            // set the status code to 200 to keep the current behavior of existing commands
+            return new CommandResponse(cmdName, "Unknown command: " + cmdName, HttpServletResponse.SC_OK);
+        }
+        if (command.isServerRequired() && (zkServer == null || !zkServer.isRunning())) {
+            // set the status code to 200 to keep the current behavior of existing commands
+            return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK);
+        }
+        return command.runPost(zkServer, inputStream);
     }
 
     /**
@@ -152,6 +184,7 @@ public class Commands {
         registerCommand(new LeaderCommand());
         registerCommand(new MonitorCommand());
         registerCommand(new ObserverCnxnStatResetCommand());
+        registerCommand(new RestoreCommand());
         registerCommand(new RuokCommand());
         registerCommand(new SetTraceMaskCommand());
         registerCommand(new SnapshotCommand());
@@ -170,14 +203,14 @@ public class Commands {
     /**
      * Reset all connection statistics.
      */
-    public static class CnxnStatResetCommand extends CommandBase {
+    public static class CnxnStatResetCommand extends GetCommand {
 
         public CnxnStatResetCommand() {
             super(Arrays.asList("connection_stat_reset", "crst"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             zkServer.getServerCnxnFactory().resetAllConnectionStats();
             return response;
@@ -190,14 +223,14 @@ public class Commands {
      * Server configuration parameters.
      * @see ZooKeeperServer#getConf()
      */
-    public static class ConfCommand extends CommandBase {
+    public static class ConfCommand extends GetCommand {
 
         public ConfCommand() {
             super(Arrays.asList("configuration", "conf", "config"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             response.putAll(zkServer.getConf().toMap());
             return response;
@@ -210,14 +243,14 @@ public class Commands {
      *   - "connections": list of connection info objects
      * @see org.apache.zookeeper.server.ServerCnxn#getConnectionInfo(boolean)
      */
-    public static class ConsCommand extends CommandBase {
+    public static class ConsCommand extends GetCommand {
 
         public ConsCommand() {
             super(Arrays.asList("connections", "cons"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             ServerCnxnFactory serverCnxnFactory = zkServer.getServerCnxnFactory();
             if (serverCnxnFactory != null) {
@@ -239,14 +272,14 @@ public class Commands {
     /**
      * Information on ZK datadir and snapdir size in bytes
      */
-    public static class DirsCommand extends CommandBase {
+    public static class DirsCommand extends GetCommand {
 
         public DirsCommand() {
             super(Arrays.asList("dirs"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             response.put("datadir_size", zkServer.getDataDirSize());
             response.put("logdir_size", zkServer.getLogDirSize());
@@ -264,14 +297,14 @@ public class Commands {
      * @see ZooKeeperServer#getSessionExpiryMap()
      * @see ZooKeeperServer#getEphemerals()
      */
-    public static class DumpCommand extends CommandBase {
+    public static class DumpCommand extends GetCommand {
 
         public DumpCommand() {
             super(Arrays.asList("dump"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             response.put("expiry_time_to_session_ids", zkServer.getSessionExpiryMap());
             response.put("session_id_to_ephemeral_paths", zkServer.getEphemerals());
@@ -283,14 +316,14 @@ public class Commands {
     /**
      * All defined environment variables.
      */
-    public static class EnvCommand extends CommandBase {
+    public static class EnvCommand extends GetCommand {
 
         public EnvCommand() {
             super(Arrays.asList("environment", "env", "envi"), false);
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             for (Entry e : Environment.list()) {
                 response.put(e.getKey(), e.getValue());
@@ -303,14 +336,14 @@ public class Commands {
     /**
      * Digest histories for every specific number of txns.
      */
-    public static class DigestCommand extends CommandBase {
+    public static class DigestCommand extends GetCommand {
 
         public DigestCommand() {
             super(Arrays.asList("hash"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             response.put("digests", zkServer.getZKDatabase().getDataTree().getDigestLog());
             return response;
@@ -322,14 +355,14 @@ public class Commands {
      * The current trace mask. Returned map contains:
      *   - "tracemask": Long
      */
-    public static class GetTraceMaskCommand extends CommandBase {
+    public static class GetTraceMaskCommand extends GetCommand {
 
         public GetTraceMaskCommand() {
             super(Arrays.asList("get_trace_mask", "gtmk"), false);
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             response.put("tracemask", ZooTrace.getTextTraceLevel());
             return response;
@@ -337,14 +370,14 @@ public class Commands {
 
     }
 
-    public static class InitialConfigurationCommand extends CommandBase {
+    public static class InitialConfigurationCommand extends GetCommand {
 
         public InitialConfigurationCommand() {
             super(Arrays.asList("initial_configuration", "icfg"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             response.put("initial_configuration", zkServer.getInitialConfig());
             return response;
@@ -356,14 +389,14 @@ public class Commands {
      * Is this server in read-only mode. Returned map contains:
      *   - "is_read_only": Boolean
      */
-    public static class IsroCommand extends CommandBase {
+    public static class IsroCommand extends GetCommand {
 
         public IsroCommand() {
             super(Arrays.asList("is_read_only", "isro"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             response.put("read_only", zkServer instanceof ReadOnlyZooKeeperServer);
             return response;
@@ -380,14 +413,14 @@ public class Commands {
      *   - "zxid": String
      *   - "timestamp": Long
      */
-    public static class LastSnapshotCommand extends CommandBase {
+    public static class LastSnapshotCommand extends GetCommand {
 
         public LastSnapshotCommand() {
             super(Arrays.asList("last_snapshot", "lsnp"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             SnapshotInfo info = zkServer.getTxnLogFactory().getLastSnapshotInfo();
             response.put("zxid", Long.toHexString(info == null ? -1L : info.zxid));
@@ -400,14 +433,14 @@ public class Commands {
     /**
      * Returns the leader status of this instance and the leader host string.
      */
-    public static class LeaderCommand extends CommandBase {
+    public static class LeaderCommand extends GetCommand {
 
         public LeaderCommand() {
             super(Arrays.asList("leader", "lead"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             if (zkServer instanceof QuorumZooKeeperServer) {
                 response.put("is_leader", zkServer instanceof LeaderZooKeeperServer);
@@ -450,14 +483,14 @@ public class Commands {
      *   - "synced_followers": Integer (leader only)
      *   - "pending_syncs": Integer (leader only)
      */
-    public static class MonitorCommand extends CommandBase {
+    public static class MonitorCommand extends GetCommand {
 
         public MonitorCommand() {
             super(Arrays.asList("monitor", "mntr"), false);
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             zkServer.dumpMonitorValues(response::put);
             ServerMetrics.getMetrics().getMetricsProvider().dump(response::put);
@@ -470,14 +503,14 @@ public class Commands {
     /**
      * Reset all observer connection statistics.
      */
-    public static class ObserverCnxnStatResetCommand extends CommandBase {
+    public static class ObserverCnxnStatResetCommand extends GetCommand {
 
         public ObserverCnxnStatResetCommand() {
             super(Arrays.asList("observer_connection_stat_reset", "orst"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             if (zkServer instanceof LeaderZooKeeperServer) {
                 Leader leader = ((LeaderZooKeeperServer) zkServer).getLeader();
@@ -491,17 +524,81 @@ public class Commands {
 
     }
 
+    /**
+     * Restore from snapshot on the current server.
+     *
+     * Returned map contains:
+     *  - "last_zxid": String
+     */
+    public static class RestoreCommand extends PostCommand {
+        static final String RESPONSE_DATA_LAST_ZXID = "last_zxid";
+
+        static final String ADMIN_RESTORE_ENABLED = "zookeeper.admin.restore.enabled";
+
+
+        private RateLimiter rateLimiter;
+
+        public RestoreCommand() {
+            super(Arrays.asList("restore", "rest"));
+            rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MICROSECONDS);
+        }
+
+        @Override
+        public CommandResponse runPost(final ZooKeeperServer zkServer, final InputStream inputStream) {
+            final CommandResponse response = initializeResponse();
+
+            // check feature flag
+            final boolean restoreEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_RESTORE_ENABLED, "false"));
+            if (!restoreEnabled) {
+                response.setStatusCode(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
+                LOG.warn("Restore command is disabled");
+                return response;
+            }
+
+            if (!zkServer.isSerializeLastProcessedZxidEnabled()) {
+                response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+                LOG.warn("Restore command requires serializeLastProcessedZxidEnable flag is set to true");
+                return response;
+            }
+
+            if (inputStream == null){
+                response.setStatusCode(HttpServletResponse.SC_BAD_REQUEST);
+                LOG.warn("InputStream from restore request is null");
+                return response;
+            }
+
+            // check rate limiting
+            if (!rateLimiter.allow()) {
+                response.setStatusCode(HttpStatus.TOO_MANY_REQUESTS_429);
+                ServerMetrics.getMetrics().RESTORE_RATE_LIMITED_COUNT.add(1);
+                LOG.warn("Restore request was rate limited");
+                return response;
+            }
+
+            // restore from snapshot InputStream
+            try {
+                final long lastZxid = zkServer.restoreFromSnapshot(inputStream);
+                response.put(RESPONSE_DATA_LAST_ZXID, lastZxid);
+            } catch (final Exception e) {
+                response.setStatusCode(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+                ServerMetrics.getMetrics().RESTORE_ERROR_COUNT.add(1);
+                LOG.warn("Exception occurred when restore snapshot via the restore command", e);
+            }
+            return response;
+        }
+    }
+
     /**
      * No-op command, check if the server is running
      */
-    public static class RuokCommand extends CommandBase {
+    public static class RuokCommand extends GetCommand {
 
         public RuokCommand() {
             super(Arrays.asList("ruok"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             return initializeResponse();
         }
 
@@ -513,14 +610,14 @@ public class Commands {
      *  Returned Map contains:
      *   - "tracemask": Long
      */
-    public static class SetTraceMaskCommand extends CommandBase {
+    public static class SetTraceMaskCommand extends GetCommand {
 
         public SetTraceMaskCommand() {
             super(Arrays.asList("set_trace_mask", "stmk"), false);
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             long traceMask;
             if (!kwargs.containsKey("traceMask")) {
@@ -551,28 +648,25 @@ public class Commands {
      *   - "last_zxid": String
      *   - "snapshot_size": String
      */
-    public static class SnapshotCommand extends CommandBase {
+    public static class SnapshotCommand extends GetCommand {
         static final String REQUEST_QUERY_PARAM_STREAMING = "streaming";
 
         static final String RESPONSE_HEADER_LAST_ZXID = "last_zxid";
         static final String RESPONSE_HEADER_SNAPSHOT_SIZE = "snapshot_size";
 
         static final String ADMIN_SNAPSHOT_ENABLED = "zookeeper.admin.snapshot.enabled";
-        static final String ADMIN_SNAPSHOT_INTERVAL = "zookeeper.admin.snapshot.intervalInMS";
-
-        private static final long snapshotInterval = Integer.parseInt(System.getProperty(ADMIN_SNAPSHOT_INTERVAL, "300000"));
 
         private final RateLimiter rateLimiter;
 
         public SnapshotCommand() {
             super(Arrays.asList("snapshot", "snap"));
-            rateLimiter = new RateLimiter(1, snapshotInterval, TimeUnit.MICROSECONDS);
+            rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MICROSECONDS);
         }
 
         @SuppressFBWarnings(value = "OBL_UNSATISFIED_OBLIGATION",
                 justification = "FileInputStream is passed to CommandResponse and closed in StreamOutputter")
         @Override
-        public CommandResponse run(final ZooKeeperServer zkServer, final Map<String, String> kwargs) {
+        public CommandResponse runGet(final ZooKeeperServer zkServer, final Map<String, String> kwargs) {
             final CommandResponse response = initializeResponse();
 
             // check feature flag
@@ -637,7 +731,7 @@ public class Commands {
      *   - "server_stats": ServerStats object
      *   - "node_count": Integer
      */
-    public static class SrvrCommand extends CommandBase {
+    public static class SrvrCommand extends GetCommand {
 
         public SrvrCommand() {
             super(Arrays.asList("server_stats", "srvr"));
@@ -649,7 +743,7 @@ public class Commands {
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             LOG.info("running stat");
             response.put("version", Version.getFullVersion());
@@ -676,8 +770,8 @@ public class Commands {
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
-            CommandResponse response = super.run(zkServer, kwargs);
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+            CommandResponse response = super.runGet(zkServer, kwargs);
 
             final Iterable<Map<String, Object>> connections;
             if (zkServer.getServerCnxnFactory() != null) {
@@ -702,14 +796,14 @@ public class Commands {
     /**
      * Resets server statistics.
      */
-    public static class StatResetCommand extends CommandBase {
+    public static class StatResetCommand extends GetCommand {
 
         public StatResetCommand() {
             super(Arrays.asList("stat_reset", "srst"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             zkServer.serverStats().reset();
             return response;
@@ -723,14 +817,14 @@ public class Commands {
      *   - "observers": list of observer learner handler info objects (leader/follower only)
      * @see org.apache.zookeeper.server.quorum.LearnerHandler#getLearnerHandlerInfo()
      */
-    public static class SyncedObserverConsCommand extends CommandBase {
+    public static class SyncedObserverConsCommand extends GetCommand {
 
         public SyncedObserverConsCommand() {
             super(Arrays.asList("observers", "obsr"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
 
             CommandResponse response = initializeResponse();
 
@@ -760,14 +854,14 @@ public class Commands {
     /**
      * All defined system properties.
      */
-    public static class SystemPropertiesCommand extends CommandBase {
+    public static class SystemPropertiesCommand extends GetCommand {
 
         public SystemPropertiesCommand() {
             super(Arrays.asList("system_properties", "sysp"), false);
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             Properties systemProperties = System.getProperties();
             SortedMap<String, String> sortedSystemProperties = new TreeMap<>();
@@ -782,14 +876,14 @@ public class Commands {
      * Returns the current ensemble configuration information.
      * It provides list of current voting members in the ensemble.
      */
-    public static class VotingViewCommand extends CommandBase {
+    public static class VotingViewCommand extends GetCommand {
 
         public VotingViewCommand() {
             super(Arrays.asList("voting_view"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             if (zkServer instanceof QuorumZooKeeperServer) {
                 QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
@@ -850,14 +944,14 @@ public class Commands {
      * @see DataTree#getWatches()
      * @see DataTree#getWatches()
      */
-    public static class WatchCommand extends CommandBase {
+    public static class WatchCommand extends GetCommand {
 
         public WatchCommand() {
             super(Arrays.asList("watches", "wchc"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             DataTree dt = zkServer.getZKDatabase().getDataTree();
             CommandResponse response = initializeResponse();
             response.put("session_id_to_watched_paths", dt.getWatches().toMap());
@@ -871,14 +965,14 @@ public class Commands {
      *   - "path_to_session_ids": Map&lt;String, Set&lt;Long&gt;&gt; path -&gt; session IDs of sessions watching path
      * @see DataTree#getWatchesByPath()
      */
-    public static class WatchesByPathCommand extends CommandBase {
+    public static class WatchesByPathCommand extends GetCommand {
 
         public WatchesByPathCommand() {
             super(Arrays.asList("watches_by_path", "wchp"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             DataTree dt = zkServer.getZKDatabase().getDataTree();
             CommandResponse response = initializeResponse();
             response.put("path_to_session_ids", dt.getWatchesByPath().toMap());
@@ -891,14 +985,14 @@ public class Commands {
      * Summarized watch information.
      * @see DataTree#getWatchesSummary()
      */
-    public static class WatchSummaryCommand extends CommandBase {
+    public static class WatchSummaryCommand extends GetCommand {
 
         public WatchSummaryCommand() {
             super(Arrays.asList("watch_summary", "wchs"));
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             DataTree dt = zkServer.getZKDatabase().getDataTree();
             CommandResponse response = initializeResponse();
             response.putAll(dt.getWatchesSummary().toMap());
@@ -911,14 +1005,14 @@ public class Commands {
      * Returns the current phase of Zab protocol that peer is running.
      * It can be in one of these phases: ELECTION, DISCOVERY, SYNCHRONIZATION, BROADCAST
      */
-    public static class ZabStateCommand extends CommandBase {
+    public static class ZabStateCommand extends GetCommand {
 
         public ZabStateCommand() {
             super(Arrays.asList("zabstate"), false);
         }
 
         @Override
-        public CommandResponse run(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
             CommandResponse response = initializeResponse();
             if (zkServer instanceof QuorumZooKeeperServer) {
                 QuorumPeer peer = ((QuorumZooKeeperServer) zkServer).self;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java
new file mode 100644
index 000000000..509cad72e
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.admin;
+
+import java.io.InputStream;
+import java.util.List;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+/**
+ * Command that represents HTTP GET request
+ */
+
+public abstract class GetCommand extends CommandBase {
+
+    protected GetCommand(List<String> names) {
+        super(names);
+    }
+
+    protected GetCommand(List<String> names, boolean serverRequired) {
+        super(names, serverRequired);
+    }
+
+    @Override
+    public CommandResponse runPost(ZooKeeperServer zkServer, InputStream inputStream) {
+        return null;
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
index 3c82f8552..effececdc 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
@@ -236,6 +236,7 @@ public class JettyAdminServer implements AdminServer {
 
         private static final long serialVersionUID = 1L;
 
+        @Override
         protected void doGet(
             HttpServletRequest request,
             HttpServletResponse response) throws ServletException, IOException {
@@ -260,7 +261,7 @@ public class JettyAdminServer implements AdminServer {
             }
 
             // Run the command
-            final CommandResponse cmdResponse = Commands.runCommand(cmd, zkServer, kwargs);
+            final CommandResponse cmdResponse = Commands.runGetCommand(cmd, zkServer, kwargs);
             response.setStatus(cmdResponse.getStatusCode());
 
             final Map<String, String> headers = cmdResponse.getHeaders();
@@ -280,6 +281,60 @@ public class JettyAdminServer implements AdminServer {
                 outputter.output(cmdResponse, response.getOutputStream());
             }
         }
+
+        /**
+         * Serves HTTP POST requests. It reads request payload as raw data.
+         * It's up to each command to process the payload accordingly.
+         * For example, RestoreCommand uses the payload InputStream directly
+         * to read snapshot data.
+         */
+        @Override
+        protected void doPost(final HttpServletRequest request,
+                              final HttpServletResponse response) throws ServletException, IOException {
+            final String cmdName = extractCommandNameFromURL(request, response);
+            if (cmdName != null) {
+                final CommandResponse cmdResponse = Commands.runPostCommand(cmdName, zkServer, request.getInputStream());
+                final String clientIP = IPAuthenticationProvider.getClientIPAddress(request);
+                sendJSONResponse(response, cmdResponse, clientIP);
+            }
+        }
+
+        /**
+         * Extracts the command name from URL if it exists otherwise null
+         */
+        private String extractCommandNameFromURL(final HttpServletRequest request,
+                                                 final HttpServletResponse response) throws IOException {
+            String cmd = request.getPathInfo();
+            if (cmd == null || cmd.equals("/")) {
+                printCommandLinks(response);
+                return null;
+            }
+            // Strip leading "/"
+            return cmd.substring(1);
+        }
+
+        /**
+         * Prints the list of URLs to each registered command as response.
+         */
+        private void printCommandLinks(final HttpServletResponse response) throws IOException {
+            for (final String link : commandLinks()) {
+                response.getWriter().println(link);
+                response.getWriter().println("<br/>");
+            }
+        }
+
+        /**
+         * Send JSON string as the response.
+         */
+        private void sendJSONResponse(final HttpServletResponse response,
+                                      final CommandResponse cmdResponse,
+                                      final String clientIP) throws IOException {
+            final CommandOutputter outputter = new JsonOutputter(clientIP);
+
+            response.setStatus(cmdResponse.getStatusCode());
+            response.setContentType(outputter.getContentType());
+            outputter.output(cmdResponse, response.getWriter());
+        }
     }
 
     /**
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java
new file mode 100644
index 000000000..cb346b2e0
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Command that represents HTTP POST request
+ */
+package org.apache.zookeeper.server.admin;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+public abstract class PostCommand extends CommandBase {
+    protected PostCommand(List<String> names) {
+        super(names);
+    }
+
+    @Override
+    public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+        return null;
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
index 1a91d1c30..f162f0cc6 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/FileSnap.java
@@ -99,7 +99,7 @@ public class FileSnap implements SnapShot {
                     SnapStream.checkSealIntegrity(snapIS, ia);
                 }
 
-                // deserialize the last processed zxid and check the intact
+                // deserialize lastProcessedZxid and check inconsistency
                 if (dt.deserializeLastProcessedZxid(ia)) {
                     SnapStream.checkSealIntegrity(snapIS, ia);
                 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java
index 9bc8ee519..847b12c28 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/persistence/SnapStream.java
@@ -170,7 +170,7 @@ public class SnapStream {
      * the checkSum of the content.
      *
      */
-    static void checkSealIntegrity(CheckedInputStream is, InputArchive ia) throws IOException {
+    public static void checkSealIntegrity(CheckedInputStream is, InputArchive ia) throws IOException {
         long checkSum = is.getChecksum().getValue();
         long val = ia.readLong("val");
         ia.readString("path");  // Read and ignore "/" written by SealStream.
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
index 826a875b4..8d9430e1c 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java
@@ -72,8 +72,8 @@ public class ZKTestCase {
         // same port.
         System.setProperty("zookeeper.admin.enableServer", "false");
 
-        // disable rate limiting on the snapshot admin API
-        System.setProperty("zookeeper.admin.snapshot.intervalInMS", "0");
+        // disable rate limiting
+        System.setProperty("zookeeper.admin.rateLimiterIntervalInMS", "0");
 
         // ZOOKEEPER-2693 disables all 4lw by default.
         // Here we enable the 4lw which ZooKeeper tests depends.
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerRestoreTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerRestoreTest.java
new file mode 100644
index 000000000..85f514dd7
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerRestoreTest.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server;
+
+import static org.apache.zookeeper.server.persistence.FileSnap.SNAPSHOT_FILE_PREFIX;
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.File;
+import java.util.Set;
+import java.util.zip.CheckedInputStream;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.persistence.SnapStream;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+public class ZookeeperServerRestoreTest extends ZKTestCase {
+    private static final String BASE_PATH = "/restoreFromSnapshotTest";
+    private static final int NODE_COUNT = 10;
+    private static final String HOST_PORT = "127.0.0.1:" + PortAssignment.unique();
+
+    @TempDir
+    static File dataDir;
+
+    @TempDir
+    static File logDir;
+
+    @Test
+    public void testRestoreFromSnapshot() throws Exception {
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
+
+        final ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000);
+        final int port = Integer.parseInt(HOST_PORT.split(":")[1]);
+        final ServerCnxnFactory serverCnxnFactory = ServerCnxnFactory.createFactory(port, -1);
+
+        ZooKeeper zk1 = null;
+        ZooKeeper zk2 = null;
+        ZooKeeper zk3 = null;
+
+        try {
+            // start the server
+            serverCnxnFactory.startup(zks);
+            assertTrue(ClientBase.waitForServerUp(HOST_PORT, CONNECTION_TIMEOUT));
+
+            // zk1 create test data
+            zk1 = ClientBase.createZKClient(HOST_PORT);
+            for (int i = 0; i < NODE_COUNT; i++) {
+                final String path = BASE_PATH + "-" + i;
+                zk1.create(path, String.valueOf(i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+
+            // take Snapshot
+            final File snapshotFile = zks.takeSnapshot(false, false, true);
+            final long lastZxidFromSnapshot = Util.getZxidFromName(snapshotFile.getName(), SNAPSHOT_FILE_PREFIX);
+
+            // zk2 create more test data after snapshotting
+            zk2 = ClientBase.createZKClient(HOST_PORT);
+            for (int i = NODE_COUNT; i < NODE_COUNT * 2; i++) {
+                final String path = BASE_PATH + "-" + i;
+                zk2.create(path, String.valueOf(i).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+
+            // restore from snapshot
+            try (final CheckedInputStream is = SnapStream.getInputStream(snapshotFile)) {
+                final long lastZxidFromRestore = zks.restoreFromSnapshot(is);
+
+                // validate the last processed zxid
+                assertEquals(lastZxidFromSnapshot, lastZxidFromRestore);
+
+                // validate restored data only contains data from snapshot
+                zk3 = ClientBase.createZKClient(HOST_PORT);
+                for (int i = 0; i < NODE_COUNT; i++) {
+                    final String path = BASE_PATH + "-" + i;
+                    final String expectedData = String.valueOf(i);
+                    assertArrayEquals(expectedData.getBytes(), zk3.getData(path, null, null));
+                }
+                assertEquals(NODE_COUNT + 3, zk3.getAllChildrenNumber("/"));
+
+                // validate sessions
+                final SessionTracker sessionTracker = zks.getSessionTracker();
+                final Set<Long> globalSessions = sessionTracker.globalSessions();
+                assertEquals(2, globalSessions.size());
+                assertTrue(globalSessions.contains(zk1.getSessionId()));
+                Assertions.assertFalse(globalSessions.contains(zk2.getSessionId()));
+                assertTrue(globalSessions.contains(zk3.getSessionId()));
+
+                // validate ZookeeperServer state
+                assertEquals(ZooKeeperServer.State.RUNNING, zks.state);
+
+                // validate being able to create more data after restore
+                zk3.create(BASE_PATH + "_" + "after", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+                assertEquals(NODE_COUNT + 4, zk3.getAllChildrenNumber("/"));
+            }
+        } finally {
+            System.clearProperty("zookeeper.serializeLastProcessedZxid.enabled");
+
+            if (zk1 != null) {
+                zk1.close();
+            }
+            if (zk2 != null) {
+                zk2.close();
+            }
+            if (zk3 != null) {
+                zk3.close();
+            }
+
+            zks.shutdown();
+            serverCnxnFactory.shutdown();
+        }
+    }
+
+    @Test
+    public void testRestoreFromSnapshot_nulInputStream() throws Exception {
+        final ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000);
+        assertThrows(IllegalArgumentException.class, () -> zks.restoreFromSnapshot(null));
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerSnapshotTest.java
similarity index 68%
rename from zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java
rename to zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerSnapshotTest.java
index a9953b7c0..c9d74a4cb 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/TakeSnapshotTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/ZookeeperServerSnapshotTest.java
@@ -16,26 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.zookeeper;
+package org.apache.zookeeper.server;
 
+import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.File;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.test.ClientBase;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
-public class TakeSnapshotTest extends ClientBase {
+public class ZookeeperServerSnapshotTest extends ZKTestCase {
     private static final String BASE_PATH = "/takeSnapshotTest";
-    private static final int NODE_COUNT = 100;
-    private static final String HOSTPORT = "127.0.0.1:" + PortAssignment.unique();
-    private ZooKeeper zk;
+    private static final int NODE_COUNT = 10;
+    private static final String HOST_PORT = "127.0.0.1:" + PortAssignment.unique();
 
     @TempDir
     static File dataDir;
@@ -43,32 +43,19 @@ public class TakeSnapshotTest extends ClientBase {
     @TempDir
     static File logDir;
 
-
-    @BeforeEach
-    public void setUp() throws Exception {
-        super.setUp();
-        ClientBase.setupTestEnv();
-    }
-
-    @AfterEach
-    public void tearDown() throws Exception {
-        if (zk != null) {
-            zk.close();
-        }
-    }
-
     @Test
-    public void testTakeSnapshotAndRestore() throws Exception {
+    public void testTakeSnapshot() throws Exception {
         ZooKeeperServer zks = new ZooKeeperServer(dataDir, logDir, 3000);
         ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
 
-        final int port = Integer.parseInt(HOSTPORT.split(":")[1]);
+        final int port = Integer.parseInt(HOST_PORT.split(":")[1]);
         final ServerCnxnFactory serverCnxnFactory = ServerCnxnFactory.createFactory(port, -1);
-        serverCnxnFactory.startup(zks);
-        assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+        ZooKeeper zk = null;
+        try  {
+            serverCnxnFactory.startup(zks);
+            assertTrue(ClientBase.waitForServerUp(HOST_PORT, CONNECTION_TIMEOUT));
 
-        try {
-            zk = ClientBase.createZKClient(HOSTPORT);
+            zk = ClientBase.createZKClient(HOST_PORT);
             for (int i = 0; i < NODE_COUNT; i++) {
                 final String path = BASE_PATH + "-" + i;
                 zk.create(path, String.valueOf(i).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
@@ -81,14 +68,14 @@ public class TakeSnapshotTest extends ClientBase {
             zk.close();
             zks.shutdown();
 
-            // start server again and assert the data restored from snapshot
+            // restart server and assert the data restored from snapshot
             zks = new ZooKeeperServer(dataDir, logDir, 3000);
             ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
 
             serverCnxnFactory.startup(zks);
-            assertTrue(ClientBase.waitForServerUp(HOSTPORT, CONNECTION_TIMEOUT));
+            assertTrue(ClientBase.waitForServerUp(HOST_PORT, CONNECTION_TIMEOUT));
 
-            zk = ClientBase.createZKClient(HOSTPORT);
+            zk = ClientBase.createZKClient(HOST_PORT);
             for (int i = 0; i < NODE_COUNT; i++) {
                 final String path = BASE_PATH + "-" + i;
                 final String expectedData = String.valueOf(i);
@@ -96,6 +83,10 @@ public class TakeSnapshotTest extends ClientBase {
             }
             assertEquals(NODE_COUNT + 3, zk.getAllChildrenNumber("/"));
         } finally {
+            if (zk != null) {
+                zk.close();
+            }
+
             zks.shutdown();
             serverCnxnFactory.shutdown();
         }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
index 2d74776e3..c60ae868d 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
@@ -19,8 +19,9 @@
 package org.apache.zookeeper.server.admin;
 
 import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.ADMIN_RATE_LIMITER_INTERVAL;
+import static org.apache.zookeeper.server.admin.Commands.RestoreCommand.ADMIN_RESTORE_ENABLED;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
-import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
@@ -30,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -57,6 +59,8 @@ public class CommandsTest extends ClientBase {
      *            - the primary name of the command
      * @param kwargs
      *            - keyword arguments to the command
+     * @param inputStream
+     *            - InputStream to the command
      * @param expectedHeaders
      *            - expected HTTP response headers
      * @param expectedStatusCode
@@ -66,11 +70,12 @@ public class CommandsTest extends ClientBase {
      * @throws IOException
      * @throws InterruptedException
      */
-    private void testCommand(String cmdName, Map<String, String> kwargs,
+    private void testCommand(String cmdName, Map<String, String> kwargs, InputStream inputStream,
                              Map<String, String> expectedHeaders, int expectedStatusCode,
                              Field... fields) throws IOException, InterruptedException {
         ZooKeeperServer zks = serverFactory.getZooKeeperServer();
-        final CommandResponse commandResponse = Commands.runCommand(cmdName, zks, kwargs);
+        final CommandResponse commandResponse = inputStream == null
+        ? Commands.runGetCommand(cmdName, zks, kwargs) : Commands.runPostCommand(cmdName, zks, inputStream);
         assertNotNull(commandResponse);
         assertEquals(expectedStatusCode, commandResponse.getStatusCode());
         try (final InputStream responseStream = commandResponse.getInputStream()) {
@@ -102,7 +107,7 @@ public class CommandsTest extends ClientBase {
     }
 
     public void testCommand(String cmdName, Field... fields) throws IOException, InterruptedException {
-        testCommand(cmdName, new HashMap<String, String>(), new HashMap<>(), HttpServletResponse.SC_OK, fields);
+        testCommand(cmdName, new HashMap<String, String>(), null, new HashMap<>(), HttpServletResponse.SC_OK, fields);
     }
 
     private static class Field {
@@ -115,16 +120,6 @@ public class CommandsTest extends ClientBase {
         }
     }
 
-    @Test
-    public void testSnapshot_streaming() throws IOException, InterruptedException {
-        testSnapshot(true);
-    }
-
-    @Test
-    public void testSnapshot_nonStreaming() throws IOException, InterruptedException {
-        testSnapshot(false);
-    }
-
     @Test
     public void testConfiguration() throws IOException, InterruptedException {
         testCommand("configuration", new Field("client_port", Integer.class), new Field("data_dir", String.class), new Field("data_log_dir", String.class), new Field("tick_time", Integer.class), new Field("max_client_cnxns", Integer.class), new Field("min_session_timeout", Integer.class), new Field("max_session_timeout", Integer.class), new Field("server_id", Long.class), new Field("client_port_listen_backlog", Integer.class));
@@ -230,6 +225,45 @@ public class CommandsTest extends ClientBase {
         testCommand("ruok");
     }
 
+    @Test
+    public void testRestore_invalidInputStream() throws IOException, InterruptedException {
+        setupForRestoreCommand();
+
+        try (final InputStream inputStream = new ByteArrayInputStream("Invalid snapshot data".getBytes())){
+            final Map<String, String> kwargs = new HashMap<>();
+            final Map<String, String> expectedHeaders = new HashMap<>();
+            testCommand("restore", kwargs, inputStream, expectedHeaders, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+        } finally {
+            clearForRestoreCommand();
+        }
+    }
+
+    @Test
+    public void testRestore_nullInputStream() {
+        setupForRestoreCommand();
+        final ZooKeeperServer zks = serverFactory.getZooKeeperServer();
+        try {
+            final CommandResponse commandResponse = Commands.runPostCommand("restore", zks, null);
+            assertNotNull(commandResponse);
+            assertEquals(HttpServletResponse.SC_BAD_REQUEST, commandResponse.getStatusCode());
+        } finally {
+          clearForRestoreCommand();
+          if (zks != null) {
+              zks.shutdown();
+          }
+        }
+    }
+
+    @Test
+    public void testSnapshot_streaming() throws IOException, InterruptedException {
+        testSnapshot(true);
+    }
+
+    @Test
+    public void testSnapshot_nonStreaming() throws IOException, InterruptedException {
+        testSnapshot(false);
+    }
+
     @Test
     public void testServerStats() throws IOException, InterruptedException {
         testCommand("server_stats", new Field("version", String.class), new Field("read_only", Boolean.class), new Field("server_stats", ServerStats.class), new Field("node_count", Integer.class), new Field("client_response", BufferStats.class));
@@ -239,7 +273,7 @@ public class CommandsTest extends ClientBase {
     public void testSetTraceMask() throws IOException, InterruptedException {
         Map<String, String> kwargs = new HashMap<String, String>();
         kwargs.put("traceMask", "1");
-        testCommand("set_trace_mask", kwargs, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class));
+        testCommand("set_trace_mask", kwargs, null, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class));
     }
 
     @Test
@@ -289,7 +323,7 @@ public class CommandsTest extends ClientBase {
         when(zkServer.getSecureServerCnxnFactory()).thenReturn(cnxnFactory);
 
         // Act
-        CommandResponse response = cmd.run(zkServer, null);
+        CommandResponse response = cmd.runGet(zkServer, null);
 
         // Assert
         assertThat(response.toMap().containsKey("connections"), is(true));
@@ -313,7 +347,7 @@ public class CommandsTest extends ClientBase {
         when(zkServer.getZKDatabase()).thenReturn(zkDatabase);
         when(zkDatabase.getNodeCount()).thenReturn(0);
 
-        CommandResponse response = cmd.run(zkServer, null);
+        CommandResponse response = cmd.runGet(zkServer, null);
 
         assertThat(response.toMap().containsKey("connections"), is(true));
         assertThat(response.toMap().containsKey("secure_connections"), is(true));
@@ -321,7 +355,7 @@ public class CommandsTest extends ClientBase {
 
     private void testSnapshot(final boolean streaming) throws IOException, InterruptedException {
         System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
-        System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0");
+        System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0");
         System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
         try {
             final Map<String, String> kwargs = new HashMap<>();
@@ -329,12 +363,23 @@ public class CommandsTest extends ClientBase {
             final Map<String, String> expectedHeaders = new HashMap<>();
             expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID, "0x0");
             expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE, "478");
-            testCommand("snapshot", kwargs, expectedHeaders, HttpServletResponse.SC_OK);
+            testCommand("snapshot", kwargs, null, expectedHeaders, HttpServletResponse.SC_OK);
         } finally {
             System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
-            System.clearProperty(ADMIN_SNAPSHOT_INTERVAL);
+            System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL);
             System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
         }
     }
 
+    private void setupForRestoreCommand() {
+        System.setProperty(ADMIN_RESTORE_ENABLED, "true");
+        System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0");
+        System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
+    }
+
+    private void clearForRestoreCommand() {
+        System.clearProperty(ADMIN_RESTORE_ENABLED);
+        System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL);
+        System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
+    }
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java
index b06cde6fd..41f00751a 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/JettyAdminServerTest.java
@@ -63,7 +63,7 @@ public class JettyAdminServerTest extends ZKTestCase {
 
     static final String URL_FORMAT = "http://localhost:%d/commands";
     static final String HTTPS_URL_FORMAT = "https://localhost:%d/commands";
-    static final int jettyAdminPort = PortAssignment.unique();
+    private final int jettyAdminPort = PortAssignment.unique();
 
     @BeforeEach
     public void enableServer() {
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java
similarity index 56%
rename from zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java
rename to zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java
index 5b5dbd979..f7d357e72 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotCommandTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java
@@ -19,20 +19,22 @@
 package org.apache.zookeeper.server.admin;
 
 import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.ADMIN_RATE_LIMITER_INTERVAL;
+import static org.apache.zookeeper.server.admin.Commands.RestoreCommand.ADMIN_RESTORE_ENABLED;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
-import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_INTERVAL;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.REQUEST_QUERY_PARAM_STREAMING;
 import static org.apache.zookeeper.server.admin.JettyAdminServerTest.URL_FORMAT;
-import static org.apache.zookeeper.server.admin.JettyAdminServerTest.jettyAdminPort;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.io.BufferedReader;
+import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.net.HttpURLConnection;
 import java.net.URL;
@@ -40,6 +42,11 @@ import java.net.URLEncoder;
 import java.nio.file.Files;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.zip.CheckedInputStream;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -48,8 +55,11 @@ import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.common.IOUtils;
+import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.SnapStream;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -60,13 +70,14 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
-public class SnapshotCommandTest extends ZKTestCase {
-    private static final Logger LOG = LoggerFactory.getLogger(SnapshotCommandTest.class);
+public class SnapshotAndRestoreCommandTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(SnapshotAndRestoreCommandTest.class);
 
-    private static final String PATH = "/snapshot_test";
+    private static final String SNAPSHOT_TEST_PATH = "/snapshot_test";
     private static final int NODE_COUNT = 10;
 
-    private final String hostPort =  "127.0.0.1:" + PortAssignment.unique();
+    private final String hostPort = "127.0.0.1:" + PortAssignment.unique();
+    private final int jettyAdminPort = PortAssignment.unique();
     private ServerCnxnFactory cnxnFactory;
     private JettyAdminServer adminServer;
     private ZooKeeperServer zks;
@@ -91,9 +102,10 @@ public class SnapshotCommandTest extends ZKTestCase {
         // start AdminServer
         System.setProperty("zookeeper.admin.enableServer", "true");
         System.setProperty("zookeeper.admin.serverPort", "" + jettyAdminPort);
+        System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0");
         System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
-        System.setProperty(ADMIN_SNAPSHOT_INTERVAL, "0");
         System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
+        System.setProperty(ADMIN_RESTORE_ENABLED, "true");
 
         adminServer = new JettyAdminServer();
         adminServer.setZooKeeperServer(zks);
@@ -101,7 +113,7 @@ public class SnapshotCommandTest extends ZKTestCase {
 
         // create Zookeeper client and test data
         zk = ClientBase.createZKClient(hostPort);
-        createData(zk, NODE_COUNT);
+        createData(zk, SNAPSHOT_TEST_PATH, NODE_COUNT);
     }
 
     @AfterAll
@@ -109,9 +121,10 @@ public class SnapshotCommandTest extends ZKTestCase {
         System.clearProperty("zookeeper.4lw.commands.whitelist");
         System.clearProperty("zookeeper.admin.enableServer");
         System.clearProperty("zookeeper.admin.serverPort");
+        System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL);
         System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
-        System.clearProperty(ADMIN_SNAPSHOT_INTERVAL);
         System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
+        System.clearProperty(ADMIN_RESTORE_ENABLED);
 
         if (zk != null) {
             zk.close();
@@ -131,19 +144,85 @@ public class SnapshotCommandTest extends ZKTestCase {
     }
 
     @Test
-    public void testSnapshotCommand_streaming() throws Exception {
-        // take snapshot with streaming
-        final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+    public void testSnapshotAndRestoreCommand_streaming() throws Exception {
+        ServerMetrics.getMetrics().resetAll();
 
-        // validate snapshot response
-        assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode());
-        validateResponseHeaders(snapshotConn);
-        final File snapshotFile = new File(dataDir + "/snapshot." + System.currentTimeMillis());
-        try (final InputStream inputStream = snapshotConn.getInputStream();
-             final FileOutputStream outputStream = new FileOutputStream(snapshotFile)) {
-            IOUtils.copyBytes(inputStream, outputStream, 1024, true);
-            final long fileSize = Files.size(snapshotFile.toPath());
-            assertTrue(fileSize > 0);
+        // take snapshot with streaming and validate
+        final File snapshotFile = takeSnapshotAndValidate();
+
+        // validate snapshot metrics
+        validateSnapshotMetrics();
+
+        // restore from snapshot and validate
+        performRestoreAndValidate(snapshotFile);
+
+        // validate creating data after restore
+        try (final ZooKeeper zk = ClientBase.createZKClient(hostPort)) {
+            createData(zk, SNAPSHOT_TEST_PATH, NODE_COUNT + 1);
+            assertEquals(NODE_COUNT + NODE_COUNT + 1, zk.getAllChildrenNumber(SNAPSHOT_TEST_PATH));
+        }
+
+        // validate restore metrics
+        validateRestoreMetrics();
+    }
+
+    @Test
+    public void testClientRequest_restoreInProgress() throws Exception {
+        final int threadCount = 2;
+        final int nodeCount = 50;
+        final String restoreTestPath = "/restore_test";
+
+        // take snapshot
+        final File snapshotFile = takeSnapshotAndValidate();
+
+        final ExecutorService service = Executors.newFixedThreadPool(threadCount);
+        final CountDownLatch latch = new CountDownLatch(threadCount);
+        final AtomicBoolean createSucceeded = new AtomicBoolean(false);
+        final AtomicBoolean restoreSucceeded = new AtomicBoolean(false);
+
+        // thread 1 creates data
+        service.submit(() -> {
+            try {
+                createData(zk, restoreTestPath, nodeCount);
+                createSucceeded.set(true);
+            } catch (final Exception e) {
+                LOG.error(e.getMessage());
+                e.printStackTrace();
+            } finally {
+                latch.countDown();
+            }
+        });
+
+        // thread 2 performs restore operation
+        service.submit(() -> {
+            try {
+                performRestoreAndValidate(snapshotFile);
+                restoreSucceeded.set(true);
+            } catch (final Exception e) {
+                LOG.error(e.getMessage());
+                e.printStackTrace();
+            } finally {
+                latch.countDown();
+            }
+        });
+
+        // wait for operations completed
+        latch.await();
+
+        // validate all client requests succeeded
+        if (createSucceeded.get() && restoreSucceeded.get()) {
+            assertEquals(nodeCount, zk.getAllChildrenNumber(restoreTestPath));
+        }
+    }
+
+    @Test
+    public void testRestores() throws Exception {
+        // take snapshot
+        final File snapshotFile = takeSnapshotAndValidate();
+
+        // perform restores
+        for (int i = 0; i < 3; i++) {
+            performRestoreAndValidate(snapshotFile);
         }
     }
 
@@ -186,16 +265,47 @@ public class SnapshotCommandTest extends ZKTestCase {
         }
     }
 
-    private void createData(final ZooKeeper zk, final long count) throws Exception {
+    @Test
+    public void testRestoreCommand_disabled() throws Exception {
+        System.setProperty(ADMIN_RESTORE_ENABLED, "false");
+        try {
+            final HttpURLConnection restoreConn = sendRestoreRequest();
+            assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, restoreConn.getResponseCode());
+        } finally {
+            System.setProperty(ADMIN_RESTORE_ENABLED, "true");
+        }
+    }
+
+    @Test
+    public void testRestoreCommand_serializeLastZxidDisabled() throws Exception {
+        ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
+        try {
+            final HttpURLConnection restoreConn = sendRestoreRequest();
+            assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, restoreConn.getResponseCode());
+        } finally {
+            ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
+        }
+    }
+
+    @Test
+    public void testRestoreCommand_invalidSnapshotData() throws Exception {
+        final HttpURLConnection restoreConn = sendRestoreRequest();
+        try (final InputStream inputStream = new ByteArrayInputStream("Invalid snapshot data".getBytes());
+             final OutputStream outputStream = restoreConn.getOutputStream()) {
+            IOUtils.copyBytes(inputStream, outputStream, 1024, true);
+        }
+        assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, restoreConn.getResponseCode());
+    }
+
+    private void createData(final ZooKeeper zk, final String parentPath, final long count) throws Exception {
         try {
-            zk.create(PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            zk.create(parentPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         } catch (final KeeperException.NodeExistsException ignore) {
             // ignore
         }
 
         for (int i = 0; i < count; i++) {
-            final String processNodePath = zk.create(String.format("%s/%s", PATH, "n_"), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
-            LOG.info("Node created. path={}" + processNodePath);
+            zk.create(String.format("%s/%s", parentPath, "n_"), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
         }
     }
 
@@ -208,6 +318,15 @@ public class SnapshotCommandTest extends ZKTestCase {
         return snapshotConn;
     }
 
+    private HttpURLConnection sendRestoreRequest() throws Exception  {
+        final URL restoreURL = new URL(String.format(URL_FORMAT + "/restore", jettyAdminPort));
+        final HttpURLConnection restoreConn = (HttpURLConnection) restoreURL.openConnection();
+        restoreConn.setDoOutput(true);
+        restoreConn.setRequestMethod("POST");
+
+        return restoreConn;
+    }
+
     private String buildQueryStringForSnapshotCommand(final boolean streaming) throws Exception {
         final Map<String, String> parameters = new HashMap<>();
         parameters.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming));
@@ -254,4 +373,47 @@ public class SnapshotCommandTest extends ZKTestCase {
             LOG.info("Response payload: {}", sb);
         }
     }
+
+    private void validateSnapshotMetrics() {
+        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+        assertEquals(0, (long) metrics.get("snapshot_error_count"));
+        assertEquals(0, (long) metrics.get("snapshot_rate_limited_count"));
+        assertTrue((Double) metrics.get("avg_snapshottime") > 0.0);
+    }
+
+    private void validateRestoreMetrics() {
+        Map<String, Object> metrics = MetricsUtils.currentServerMetrics();
+        assertEquals(0, (long) metrics.get("restore_error_count"));
+        assertEquals(0, (long) metrics.get("restore_rate_limited_count"));
+        assertTrue((Double) metrics.get("avg_restore_time") > 0.0);
+    }
+
+    private File takeSnapshotAndValidate() throws Exception {
+        // take snapshot with streaming
+        final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+
+        // validate snapshot response
+        assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode());
+        validateResponseHeaders(snapshotConn);
+        final File snapshotFile = new File(dataDir + "/snapshot." + System.currentTimeMillis());
+        try (final InputStream inputStream = snapshotConn.getInputStream();
+             final FileOutputStream outputStream = new FileOutputStream(snapshotFile)) {
+            IOUtils.copyBytes(inputStream, outputStream, 1024, true);
+            final long fileSize = Files.size(snapshotFile.toPath());
+            assertTrue(fileSize > 0);
+        }
+        return snapshotFile;
+    }
+
+    private void performRestoreAndValidate(final File snapshotFile) throws Exception {
+        // perform restore
+        final HttpURLConnection restoreConn = sendRestoreRequest();
+        try (final CheckedInputStream is = SnapStream.getInputStream(snapshotFile);
+             final OutputStream outputStream = restoreConn.getOutputStream()) {
+            IOUtils.copyBytes(is, outputStream, 1024, true);
+        }
+        // validate restore response
+        assertEquals(HttpURLConnection.HTTP_OK, restoreConn.getResponseCode());
+        displayResponsePayload(restoreConn);
+    }
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
index ffd659978..573d361ae 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
@@ -293,7 +293,7 @@ public class ObserverMasterTest extends ObserverMasterTestBase {
 
         // test stats collection
         final Map<String, String> emptyMap = Collections.emptyMap();
-        Map<String, Object> stats = Commands.runCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap).toMap();
+        Map<String, Object> stats = Commands.runGetCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap).toMap();
         assertTrue(stats.containsKey("observer_master_id"), "observer not emitting observer_master_id");
 
         // check the stats for the first peer