You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by eo...@apache.org on 2023/02/24 15:21:18 UTC

[zookeeper] branch master updated: ZOOKEEPER-4639: Provide auth support for admin server APIs (#1966)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new d79811bf2 ZOOKEEPER-4639: Provide auth support for admin server APIs (#1966)
d79811bf2 is described below

commit d79811bf28f00fb1db6ec6002e884af6cfd0d7fc
Author: li4wang <68...@users.noreply.github.com>
AuthorDate: Fri Feb 24 07:20:58 2023 -0800

    ZOOKEEPER-4639: Provide auth support for admin server APIs (#1966)
    
    Author: Li Wang <li...@apple.com>
---
 .../src/main/resources/markdown/html/header.html   |   3 +
 .../src/main/resources/markdown/index.md           |   1 +
 .../src/main/resources/markdown/zookeeperAdmin.md  |  15 +-
 .../markdown/zookeeperSnapshotAndRestore.md        |  68 ++++
 .../apache/zookeeper/server/ZooKeeperServer.java   |   2 +-
 .../apache/zookeeper/server/admin/AuthRequest.java |  66 ++++
 .../org/apache/zookeeper/server/admin/Command.java |  11 +-
 .../apache/zookeeper/server/admin/CommandBase.java |  20 +-
 .../apache/zookeeper/server/admin/Commands.java    | 124 ++++++-
 .../apache/zookeeper/server/admin/GetCommand.java  |   4 +
 .../zookeeper/server/admin/JettyAdminServer.java   |  16 +-
 .../apache/zookeeper/server/admin/PostCommand.java |   4 +-
 .../zookeeper/server/admin/StreamOutputter.java    |   2 +-
 .../server/auth/AuthenticationProvider.java        |  19 +
 .../server/auth/DigestAuthenticationProvider.java  |  41 ++-
 .../server/auth/IPAuthenticationProvider.java      |  15 +-
 .../server/auth/WrappedAuthenticationProvider.java |  14 +
 .../server/auth/X509AuthenticationProvider.java    |  76 ++--
 .../zookeeper/server/admin/CommandAuthTest.java    | 410 +++++++++++++++++++++
 .../zookeeper/server/admin/CommandsTest.java       |  19 +-
 .../zookeeper/server/admin/RestoreQuorumTest.java  | 116 ++++++
 .../admin/SnapshotAndRestoreCommandTest.java       |  63 ++--
 .../server/quorum/QuorumPeerTestBase.java          |  17 +-
 .../java/org/apache/zookeeper/test/IPAuthTest.java |  59 +++
 .../apache/zookeeper/test/ObserverMasterTest.java  |   2 +-
 .../org/apache/zookeeper/test/X509AuthTest.java    |  37 +-
 26 files changed, 1103 insertions(+), 121 deletions(-)

diff --git a/zookeeper-docs/src/main/resources/markdown/html/header.html b/zookeeper-docs/src/main/resources/markdown/html/header.html
index f58a99f04..c308b2d88 100644
--- a/zookeeper-docs/src/main/resources/markdown/html/header.html
+++ b/zookeeper-docs/src/main/resources/markdown/html/header.html
@@ -96,6 +96,9 @@ document.write("Last Published: " + document.lastModified);
             <div class="menuitem">
                 <a href="zookeeperQuotas.html">Quota Guide</a>
             </div>
+            <div class="menuitem">
+                <a href="zookeeperSnapshotAndRestore.html">Snapshot and Restore Guide</a>
+            </div>
             <div class="menuitem">
                 <a href="zookeeperJMX.html">JMX</a>
             </div>
diff --git a/zookeeper-docs/src/main/resources/markdown/index.md b/zookeeper-docs/src/main/resources/markdown/index.md
index 85574f30f..d3e3864f3 100644
--- a/zookeeper-docs/src/main/resources/markdown/index.md
+++ b/zookeeper-docs/src/main/resources/markdown/index.md
@@ -46,6 +46,7 @@ archives.
     Documents for Administrators and Operations Engineers of ZooKeeper Deployments
     + [Administrator's Guide](zookeeperAdmin.html) - a guide for system administrators and anyone else who might deploy ZooKeeper
     + [Quota Guide](zookeeperQuotas.html) - a guide for system administrators on Quotas in ZooKeeper.
+    + [Snapshot and Restore Guide](zookeeperSnapshotAndRestore.html) - a guide for system administrators on take snapshot and restore ZooKeeper.
     + [JMX](zookeeperJMX.html) - how to enable JMX in ZooKeeper
     + [Hierarchical Quorums](zookeeperHierarchicalQuorums.html) - a guide on how to use hierarchical quorums
     + [Oracle Quorum](zookeeperOracleQuorums.html) - the introduction to Oracle Quorum increases the availability of a cluster of 2 ZooKeeper instances with a failure detector.
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
index 1e317a0b4..83a749d0a 100644
--- a/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperAdmin.md
@@ -2124,15 +2124,18 @@ options are used to configure the [AdminServer](#sc_adminserver).
 
 * *admin.snapshot.enabled* :
   (Java system property: **zookeeper.admin.snapshot.enabled**)
-  The flag for enabling the snapshot command. Defaults to false. 
-  It will be enabled by default once the auth support for admin server commands 
-  is available.
+  The flag for enabling the snapshot command. Defaults to true. 
+
   
 * *admin.restore.enabled* :
   (Java system property: **zookeeper.admin.restore.enabled**)
-  The flag for enabling the restore command. Defaults to false.
-  It will be enabled by default once the auth support for admin server commands
-  is available.
+  The flag for enabling the restore command. Defaults to true.
+
+
+* *admin.needClientAuth* :
+  (Java system property: **zookeeper.admin.needClientAuth**)
+  The flag to control whether client auth is needed. Using x509 auth requires true.
+  Defaults to false.
 
 **New in 3.7.1:** The following
 options are used to configure the [AdminServer](#sc_adminserver).
diff --git a/zookeeper-docs/src/main/resources/markdown/zookeeperSnapshotAndRestore.md b/zookeeper-docs/src/main/resources/markdown/zookeeperSnapshotAndRestore.md
new file mode 100644
index 000000000..576f18fda
--- /dev/null
+++ b/zookeeper-docs/src/main/resources/markdown/zookeeperSnapshotAndRestore.md
@@ -0,0 +1,68 @@
+<!--
+Copyright 2002-2004 The Apache Software Foundation
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+//-->
+
+# ZooKeeper Snapshot and Restore Guide
+
+Zookeeper is designed to withstand machine failures. A Zookeeper cluster can automatically recover 
+from temporary failures such as machine reboot. It can also tolerate up to (N-1)/2 permanent 
+failures for a cluster of N members due to hardware failures or disk corruption, etc. When a member 
+permanently fails, it loses access to the cluster. If the cluster permanently loses more than 
+(N-1)/2 members, it disastrously fails and loses quorum. Once the quorum is lost, the cluster 
+cannot reach consensus and therefore cannot continue to accept updates.
+
+To recover from such disastrous failures, Zookeeper provides snapshot and restore functionalities to
+restore a cluster from a snapshot.
+
+1. Snapshot and restore operate on the connected server via Admin Server APIs
+1. Snapshot and restore are rate limited to protect the server from being overloaded
+1. Snapshot and restore require authentication and authorization on the root path with ALL permission.
+The supported auth schemas are digest, x509 and IP.
+   
+* [Snapshot](#zookeeper_snapshot)
+* [Restore](#zookeeper_restore)  
+
+<a name="zookeeper_snapshot"></a>
+
+## Snapshot
+Recovering a cluster needs a snapshot from a ZooKeeper cluster. Users can periodically take
+snapshots from a live server which has the highest zxid and stream out data to a local 
+or external storage/file system (e.g., S3).
+
+ ```bash
+ # The snapshot command takes snapshot from the server it connects to and rate limited to once every 5 mins by default
+ curl -H 'Authorization: digest root:root_passwd' http://hostname:adminPort/commands/snapshot?streaming=true --output snapshotFileName
+ ```
+
+<a name="zookeeper_restore"></a>
+## Restore
+
+Restoring a cluster needs a single snapshot as input stream. Restore can be used for recovering a 
+cluster for quorum lost or building a brand-new cluster with seed data. 
+
+All members should restore using the same snapshot. The following are the recommended steps:
+ 
+- Blocking traffic on the client port or client secure port before restore starts
+- Take a snapshot of the latest database state using the snapshot admin server command if applicable
+- For each server
+  - Moving the files in dataDir and dataLogDir to different location to prevent the restored database 
+    from being overwritten when server restarts after restore
+  - Restore the server using restore admin server command
+- Unblocking traffic on the client port or client secure port after restore completes
+
+ ```bash
+ # The restore command takes a snapshot as input stream and restore the db of the server it connects. It is rate limited to once every 5 mins by default
+ curl -H 'Content-Type:application/octet-stream' -H 'Authorization: digest root:root_passwd' -POST http://hostname:adminPort/commands/restore --data-binary "@snapshotFileName" 
+ ```
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
index 72be66aab..f460b2d5b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
@@ -2069,7 +2069,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
 
     /**
      * Grant or deny authorization to an operation on a node as a function of:
-     * @param cnxn :    the server connection
+     * @param cnxn :    the server connection or null for admin server commands
      * @param acl :     set of ACLs for the node
      * @param perm :    the permission that the client is requesting
      * @param ids :     the credentials supplied by the client
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/AuthRequest.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/AuthRequest.java
new file mode 100644
index 000000000..5552a2420
--- /dev/null
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/AuthRequest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.admin;
+
+import org.apache.zookeeper.ZooDefs;
+
+/**
+ * Class representing auth data for performing ACL check on admin server commands.
+ *
+ * For example, SnapshotCommand requires {@link ZooDefs.Perms.ALL} permission on
+ * the root node.
+ *
+ */
+public class AuthRequest {
+    private final int permission;
+    private final String path;
+
+    /**
+     * @param permission
+     *                      the required permission for auth check
+     * @param path
+     *                      the ZNode path for auth check
+     */
+    public AuthRequest(final int permission, final String path) {
+        this.permission = permission;
+        this.path = path;
+    }
+
+    /**
+     * @return permission
+     */
+    public int getPermission() {
+        return permission;
+    }
+
+    /**
+     * @return ZNode path
+     */
+    public String getPath() {
+        return path;
+    }
+
+    @Override
+    public String toString() {
+        return "AuthRequest{"
+        + "permission=" + permission
+        + ", path='" + path + '\''
+        + '}';
+    }
+}
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java
index 5d06356b2..5bc332a28 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Command.java
@@ -45,18 +45,17 @@ public interface Command {
      */
     String getPrimaryName();
 
-    /**
-     * A string documenting this command (e.g., what it does, any arguments it
-     * takes).
-     */
-    String getDoc();
-
     /**
      * @return true if the command requires an active ZooKeeperServer or a
      *     synced peer in order to resolve
      */
     boolean isServerRequired();
 
+    /**
+     * @return AuthRequest associated to the command. Null means auth check is not required.
+     */
+    AuthRequest getAuthRequest();
+
     /**
      * Run this command for HTTP GET request. Commands take a ZooKeeperServer, String-valued keyword
      * arguments and return a CommandResponse object containing any information
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandBase.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandBase.java
index b3714c256..2a9ef99bb 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandBase.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/CommandBase.java
@@ -26,24 +26,27 @@ public abstract class CommandBase implements Command {
 
     private final String primaryName;
     private final Set<String> names;
-    private final String doc;
     private final boolean serverRequired;
+    private final AuthRequest authRequest;
 
     /**
      * @param names The possible names of this command, with the primary name first.
      */
     protected CommandBase(List<String> names) {
-        this(names, true, null);
+        this(names, true);
     }
     protected CommandBase(List<String> names, boolean serverRequired) {
         this(names, serverRequired, null);
     }
 
-    protected CommandBase(List<String> names, boolean serverRequired, String doc) {
+    protected CommandBase(List<String> names, boolean serverRequired, AuthRequest authRequest) {
+        if (authRequest != null && !serverRequired) {
+            throw new IllegalArgumentException("An active server is required for auth check");
+        }
         this.primaryName = names.get(0);
         this.names = new HashSet<>(names);
-        this.doc = doc;
         this.serverRequired = serverRequired;
+        this.authRequest = authRequest;
     }
 
     @Override
@@ -56,16 +59,17 @@ public abstract class CommandBase implements Command {
         return names;
     }
 
-    @Override
-    public String getDoc() {
-        return doc;
-    }
 
     @Override
     public boolean isServerRequired() {
         return serverRequired;
     }
 
+    @Override
+    public AuthRequest getAuthRequest() {
+        return authRequest;
+    }
+
     /**
      * @return A response with the command set to the primary name and the
      *         error set to null (these are the two entries that all command
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
index dfdbe8e23..74dfdc3c1 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/Commands.java
@@ -25,6 +25,7 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -37,15 +38,23 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 import org.apache.zookeeper.Environment;
 import org.apache.zookeeper.Environment.Entry;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.Version;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.DataNode;
 import org.apache.zookeeper.server.DataTree;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooTrace;
+import org.apache.zookeeper.server.auth.ProviderRegistry;
+import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
 import org.apache.zookeeper.server.persistence.SnapshotInfo;
 import org.apache.zookeeper.server.persistence.Util;
 import org.apache.zookeeper.server.quorum.Follower;
@@ -74,8 +83,13 @@ import org.slf4j.LoggerFactory;
 public class Commands {
 
     static final Logger LOG = LoggerFactory.getLogger(Commands.class);
+    // VisibleForTesting
     static final String ADMIN_RATE_LIMITER_INTERVAL = "zookeeper.admin.rateLimiterIntervalInMS";
     private static final long rateLimiterInterval = Integer.parseInt(System.getProperty(ADMIN_RATE_LIMITER_INTERVAL, "300000"));
+    // VisibleForTesting
+    static final String AUTH_INFO_SEPARATOR = " ";
+    // VisibleForTesting
+    static final String ROOT_PATH = "/";
 
     /** Maps command names to Command instances */
     private static Map<String, Command> commands = new HashMap<>();
@@ -105,6 +119,9 @@ public class Commands {
      * @param zkServer
      * @param kwargs String-valued keyword arguments to the command from HTTP GET request
      *        (may be null if command requires no additional arguments)
+     * @param authInfo auth info for auth check
+     *        (null if command requires no auth check)
+     * @param request HTTP request
      * @return Map representing response to command containing at minimum:
      *    - "command" key containing the command's primary name
      *    - "error" key containing a String error message or null if no error
@@ -112,17 +129,10 @@ public class Commands {
     public static CommandResponse runGetCommand(
             String cmdName,
             ZooKeeperServer zkServer,
-            Map<String, String> kwargs) {
-        Command command = getCommand(cmdName);
-        if (command == null) {
-            // set the status code to 200 to keep the current behavior of existing commands
-            return new CommandResponse(cmdName, "Unknown command: " + cmdName, HttpServletResponse.SC_OK);
-        }
-        if (command.isServerRequired() && (zkServer == null || !zkServer.isRunning())) {
-            // set the status code to 200 to keep the current behavior of existing commands
-            return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK);
-        }
-        return command.runGet(zkServer, kwargs);
+            Map<String, String> kwargs,
+            String authInfo,
+            HttpServletRequest request) {
+        return runCommand(cmdName, zkServer, kwargs, null, authInfo, request, true);
     }
 
     /**
@@ -141,17 +151,93 @@ public class Commands {
     public static CommandResponse runPostCommand(
             String cmdName,
             ZooKeeperServer zkServer,
-            InputStream inputStream) {
+            InputStream inputStream,
+            String authInfo,
+            HttpServletRequest request) {
+        return runCommand(cmdName, zkServer, null, inputStream, authInfo, request, false);
+    }
+
+    private static CommandResponse runCommand(
+            String cmdName,
+            ZooKeeperServer zkServer,
+            Map<String, String> kwargs,
+            InputStream inputStream,
+            String authInfo,
+            HttpServletRequest request,
+            boolean isGet) {
         Command command = getCommand(cmdName);
         if (command == null) {
             // set the status code to 200 to keep the current behavior of existing commands
+            LOG.warn("Unknown command");
             return new CommandResponse(cmdName, "Unknown command: " + cmdName, HttpServletResponse.SC_OK);
         }
         if (command.isServerRequired() && (zkServer == null || !zkServer.isRunning())) {
             // set the status code to 200 to keep the current behavior of existing commands
+            LOG.warn("This ZooKeeper instance is not currently serving requests for command");
             return new CommandResponse(cmdName, "This ZooKeeper instance is not currently serving requests", HttpServletResponse.SC_OK);
         }
-        return command.runPost(zkServer, inputStream);
+
+        final AuthRequest authRequest = command.getAuthRequest();
+        if (authRequest != null) {
+            if (authInfo == null) {
+                LOG.warn("Auth info is missing for command");
+                return new CommandResponse(cmdName, "Auth info is missing for the command", HttpServletResponse.SC_UNAUTHORIZED);
+            }
+            try {
+                final List<Id> ids = handleAuthentication(request, authInfo);
+                handleAuthorization(zkServer, ids, authRequest.getPermission(), authRequest.getPath());
+            } catch (final KeeperException.AuthFailedException e) {
+                return new CommandResponse(cmdName, "Not authenticated", HttpServletResponse.SC_UNAUTHORIZED);
+            } catch (final KeeperException.NoAuthException e) {
+                return new CommandResponse(cmdName, "Not authorized", HttpServletResponse.SC_FORBIDDEN);
+            } catch (final Exception e) {
+                LOG.warn("Error occurred during auth for command", e);
+                return new CommandResponse(cmdName, "Error occurred during auth", HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+            }
+        }
+        return isGet ? command.runGet(zkServer, kwargs) : command.runPost(zkServer, inputStream);
+    }
+
+    private static List<Id> handleAuthentication(final HttpServletRequest request, final String authInfo) throws KeeperException.AuthFailedException {
+        final String[] authData = authInfo.split(AUTH_INFO_SEPARATOR);
+        // for IP and x509, auth info only contains the schema and Auth Id will be extracted from HTTP request
+        if (authData.length != 1 && authData.length != 2) {
+            LOG.warn("Invalid auth info length");
+            throw new KeeperException.AuthFailedException();
+        }
+
+        final String schema = authData[0];
+        final ServerAuthenticationProvider authProvider = ProviderRegistry.getServerProvider(schema);
+        if (authProvider != null) {
+            try {
+                final byte[] auth = authData.length == 2 ? authData[1].getBytes(StandardCharsets.UTF_8) : null;
+                final List<Id> ids = authProvider.handleAuthentication(request, auth);
+                if (ids.isEmpty()) {
+                    LOG.warn("Auth Id list is empty");
+                    throw new KeeperException.AuthFailedException();
+                }
+                return ids;
+            } catch (final RuntimeException e) {
+                LOG.warn("Caught runtime exception from AuthenticationProvider", e);
+                throw new KeeperException.AuthFailedException();
+            }
+        } else {
+            LOG.warn("Auth provider not found for schema");
+            throw new KeeperException.AuthFailedException();
+        }
+    }
+
+    private static void handleAuthorization(final ZooKeeperServer zkServer,
+                                            final List<Id> ids,
+                                            final int perm,
+                                            final String path)
+            throws KeeperException.NoNodeException, KeeperException.NoAuthException {
+        final DataNode dataNode = zkServer.getZKDatabase().getNode(path);
+        if (dataNode == null) {
+            throw new KeeperException.NoNodeException(path);
+        }
+        final List<ACL> acls = zkServer.getZKDatabase().aclForNode(dataNode);
+        zkServer.checkACL(null, acls, perm, ids, path, null);
     }
 
     /**
@@ -532,15 +618,13 @@ public class Commands {
      */
     public static class RestoreCommand extends PostCommand {
         static final String RESPONSE_DATA_LAST_ZXID = "last_zxid";
-
         static final String ADMIN_RESTORE_ENABLED = "zookeeper.admin.restore.enabled";
 
-
         private RateLimiter rateLimiter;
 
         public RestoreCommand() {
-            super(Arrays.asList("restore", "rest"));
-            rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MICROSECONDS);
+            super(Arrays.asList("restore", "rest"), true, new AuthRequest(ZooDefs.Perms.ALL, ROOT_PATH));
+            rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MILLISECONDS);
         }
 
         @Override
@@ -548,7 +632,7 @@ public class Commands {
             final CommandResponse response = initializeResponse();
 
             // check feature flag
-            final boolean restoreEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_RESTORE_ENABLED, "false"));
+            final boolean restoreEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_RESTORE_ENABLED, "true"));
             if (!restoreEnabled) {
                 response.setStatusCode(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
                 LOG.warn("Restore command is disabled");
@@ -659,7 +743,7 @@ public class Commands {
         private final RateLimiter rateLimiter;
 
         public SnapshotCommand() {
-            super(Arrays.asList("snapshot", "snap"));
+            super(Arrays.asList("snapshot", "snap"), true, new AuthRequest(ZooDefs.Perms.ALL, ROOT_PATH));
             rateLimiter = new RateLimiter(1, rateLimiterInterval, TimeUnit.MICROSECONDS);
         }
 
@@ -670,7 +754,7 @@ public class Commands {
             final CommandResponse response = initializeResponse();
 
             // check feature flag
-            final boolean snapshotEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_SNAPSHOT_ENABLED, "false"));
+            final boolean snapshotEnabled = Boolean.parseBoolean(System.getProperty(ADMIN_SNAPSHOT_ENABLED, "true"));
             if (!snapshotEnabled) {
                 response.setStatusCode(HttpServletResponse.SC_SERVICE_UNAVAILABLE);
                 LOG.warn("Snapshot command is disabled");
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java
index 509cad72e..ea7e5d907 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/GetCommand.java
@@ -36,6 +36,10 @@ public abstract class GetCommand extends CommandBase {
         super(names, serverRequired);
     }
 
+    protected GetCommand(List<String> names, boolean serverRequired, AuthRequest authRequest) {
+        super(names, serverRequired, authRequest);
+    }
+
     @Override
     public CommandResponse runPost(ZooKeeperServer zkServer, InputStream inputStream) {
         return null;
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
index 98f2cb051..a237e4c3b 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/JettyAdminServer.java
@@ -34,6 +34,7 @@ import org.apache.zookeeper.common.SecretUtils;
 import org.apache.zookeeper.common.X509Util;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.auth.IPAuthenticationProvider;
+import org.eclipse.jetty.http.HttpHeader;
 import org.eclipse.jetty.http.HttpVersion;
 import org.eclipse.jetty.security.ConstraintMapping;
 import org.eclipse.jetty.security.ConstraintSecurityHandler;
@@ -89,7 +90,8 @@ public class JettyAdminServer implements AdminServer {
             System.getProperty("zookeeper.admin.commandURL", DEFAULT_COMMAND_URL),
             Integer.getInteger("zookeeper.admin.httpVersion", DEFAULT_HTTP_VERSION),
             Boolean.getBoolean("zookeeper.admin.portUnification"),
-            Boolean.getBoolean("zookeeper.admin.forceHttps"));
+            Boolean.getBoolean("zookeeper.admin.forceHttps"),
+            Boolean.getBoolean("zookeeper.admin.needClientAuth"));
     }
 
     public JettyAdminServer(
@@ -99,7 +101,8 @@ public class JettyAdminServer implements AdminServer {
         String commandUrl,
         int httpVersion,
         boolean portUnification,
-        boolean forceHttps) throws IOException, GeneralSecurityException {
+        boolean forceHttps,
+        boolean needClientAuth) throws IOException, GeneralSecurityException {
 
         this.port = port;
         this.idleTimeout = timeout;
@@ -144,11 +147,12 @@ public class JettyAdminServer implements AdminServer {
                     throw e;
                 }
 
-                SslContextFactory sslContextFactory = new SslContextFactory.Server();
+                SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
                 sslContextFactory.setKeyStore(keyStore);
                 sslContextFactory.setKeyStorePassword(privateKeyPassword);
                 sslContextFactory.setTrustStore(trustStore);
                 sslContextFactory.setTrustStorePassword(certAuthPassword);
+                sslContextFactory.setNeedClientAuth(needClientAuth);
 
                 if (forceHttps) {
                     connector = new ServerConnector(server,
@@ -259,9 +263,10 @@ public class JettyAdminServer implements AdminServer {
             for (Map.Entry<String, String[]> entry : parameterMap.entrySet()) {
                 kwargs.put(entry.getKey(), entry.getValue()[0]);
             }
+            final String authInfo = request.getHeader(HttpHeader.AUTHORIZATION.asString());
 
             // Run the command
-            final CommandResponse cmdResponse = Commands.runGetCommand(cmd, zkServer, kwargs);
+            final CommandResponse cmdResponse = Commands.runGetCommand(cmd, zkServer, kwargs, authInfo, request);
             response.setStatus(cmdResponse.getStatusCode());
 
             final Map<String, String> headers = cmdResponse.getHeaders();
@@ -293,7 +298,8 @@ public class JettyAdminServer implements AdminServer {
                               final HttpServletResponse response) throws ServletException, IOException {
             final String cmdName = extractCommandNameFromURL(request, response);
             if (cmdName != null) {
-                final CommandResponse cmdResponse = Commands.runPostCommand(cmdName, zkServer, request.getInputStream());
+                final String authInfo = request.getHeader(HttpHeader.AUTHORIZATION.asString());
+                final CommandResponse cmdResponse = Commands.runPostCommand(cmdName, zkServer, request.getInputStream(), authInfo, request);
                 final String clientIP = IPAuthenticationProvider.getClientIPAddress(request);
                 sendJSONResponse(response, cmdResponse, clientIP);
             }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java
index cb346b2e0..3f5d50a88 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/PostCommand.java
@@ -26,8 +26,8 @@ import java.util.Map;
 import org.apache.zookeeper.server.ZooKeeperServer;
 
 public abstract class PostCommand extends CommandBase {
-    protected PostCommand(List<String> names) {
-        super(names);
+    protected PostCommand(List<String> names, boolean serverRequired, AuthRequest authRequest) {
+        super(names, serverRequired, authRequest);
     }
 
     @Override
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java
index e8f68e649..b0caa1923 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/admin/StreamOutputter.java
@@ -46,7 +46,7 @@ public class StreamOutputter implements CommandOutputter{
         try (final InputStream is = response.getInputStream()){
             IOUtils.copyBytes(is, os, 1024, true);
         } catch (final IOException e) {
-            LOG.error("Exception occurred when streaming out data to {}", clientIP, e);
+            LOG.warn("Exception streaming out data to {}", clientIP, e);
         }
     }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/AuthenticationProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/AuthenticationProvider.java
index 179eac8df..1de8a7411 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/AuthenticationProvider.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/AuthenticationProvider.java
@@ -18,7 +18,11 @@
 
 package org.apache.zookeeper.server.auth;
 
+import java.util.ArrayList;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.server.ServerCnxn;
 
 /**
@@ -49,6 +53,21 @@ public interface AuthenticationProvider {
      */
     KeeperException.Code handleAuthentication(ServerCnxn cnxn, byte[] authData);
 
+    /**
+     * This method is called when admin server command passes authentication data for this
+     * scheme.
+     *
+     * @param request
+     *                the request that contains the authentication information.
+     * @param authData
+     *                the authentication data received.
+     * @return Ids
+     *                the list of Id. Empty list means not authenticated
+     */
+    default List<Id> handleAuthentication(HttpServletRequest request, byte[] authData) {
+        return new ArrayList<>();
+    }
+
     /**
      * This method is called to see if the given id matches the given id
      * expression in the ACL. This allows schemes to use application specific
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java
index 6e2687521..1a13d5a1a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/DigestAuthenticationProvider.java
@@ -21,6 +21,10 @@ package org.apache.zookeeper.server.auth;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.server.ServerCnxn;
@@ -123,18 +127,19 @@ public class DigestAuthenticationProvider implements AuthenticationProvider {
     }
 
     public KeeperException.Code handleAuthentication(ServerCnxn cnxn, byte[] authData) {
-        String id = new String(authData);
-        try {
-            String digest = generateDigest(id);
-            if (digest.equals(superDigest)) {
-                cnxn.addAuthInfo(new Id("super", ""));
-            }
-            cnxn.addAuthInfo(new Id(getScheme(), digest));
-            return KeeperException.Code.OK;
-        } catch (NoSuchAlgorithmException e) {
-            LOG.error("Missing algorithm", e);
+        final List<Id> ids = handleAuthentication(authData);
+        if (ids.isEmpty()) {
+            return KeeperException.Code.AUTHFAILED;
+        }
+        for (Id id : ids) {
+            cnxn.addAuthInfo(id);
         }
-        return KeeperException.Code.AUTHFAILED;
+        return KeeperException.Code.OK;
+    }
+
+    @Override
+    public List<Id> handleAuthentication(HttpServletRequest request, byte[] authData) {
+        return handleAuthentication(authData);
     }
 
     public boolean isAuthenticated() {
@@ -170,4 +175,18 @@ public class DigestAuthenticationProvider implements AuthenticationProvider {
         }
     }
 
+    private List<Id> handleAuthentication(final byte[] authData) {
+        final List<Id> ids = new ArrayList<>();
+        final String id = new String(authData);
+        try {
+            final String digest = generateDigest(id);
+            if (digest.equals(superDigest)) {
+                ids.add(new Id("super", ""));
+            }
+            ids.add(new Id(getScheme(), digest));
+        } catch (final NoSuchAlgorithmException e) {
+            LOG.error("Missing algorithm", e);
+        }
+        return Collections.unmodifiableList(ids);
+    }
 }
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
index 9f6fb4005..9334f7c5e 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/IPAuthenticationProvider.java
@@ -18,6 +18,9 @@
 
 package org.apache.zookeeper.server.auth;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.StringTokenizer;
 import javax.servlet.http.HttpServletRequest;
 import org.apache.zookeeper.KeeperException;
@@ -25,7 +28,7 @@ import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.server.ServerCnxn;
 
 public class IPAuthenticationProvider implements AuthenticationProvider {
-    private static final String X_FORWARDED_FOR_HEADER_NAME = "X-Forwarded-For";
+    public static final String X_FORWARDED_FOR_HEADER_NAME = "X-Forwarded-For";
 
     public String getScheme() {
         return "ip";
@@ -37,6 +40,16 @@ public class IPAuthenticationProvider implements AuthenticationProvider {
         return KeeperException.Code.OK;
     }
 
+    @Override
+    public List<Id> handleAuthentication(HttpServletRequest request, byte[] authData) {
+        final List<Id> ids = new ArrayList<>();
+
+        final String ip = getClientIPAddress(request);
+        ids.add(new Id(getScheme(), ip));
+
+        return Collections.unmodifiableList(ids);
+    }
+
     // This is a bit weird but we need to return the address and the number of
     // bytes (to distinguish between IPv4 and IPv6
     private byte[] addr2Bytes(String addr) {
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/WrappedAuthenticationProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/WrappedAuthenticationProvider.java
index 65dc4376b..6347ede5a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/WrappedAuthenticationProvider.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/WrappedAuthenticationProvider.java
@@ -18,7 +18,10 @@
 
 package org.apache.zookeeper.server.auth;
 
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.server.ServerCnxn;
 
 /**
@@ -52,6 +55,17 @@ class WrappedAuthenticationProvider extends ServerAuthenticationProvider {
         return implementation.handleAuthentication(serverObjs.getCnxn(), authData);
     }
 
+    /**
+     * {@inheritDoc}
+     *
+     * forwards to older method {@link #handleAuthentication(HttpServletRequest, byte[])}
+     * @return
+     */
+    @Override
+    public List<Id> handleAuthentication(HttpServletRequest request, byte[] authData) {
+        return implementation.handleAuthentication(request, authData);
+    }
+
     /**
      * {@inheritDoc}
      *
diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/X509AuthenticationProvider.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/X509AuthenticationProvider.java
index 255e5cf8a..9dd27603a 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/X509AuthenticationProvider.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/auth/X509AuthenticationProvider.java
@@ -20,9 +20,14 @@ package org.apache.zookeeper.server.auth;
 
 import java.security.cert.CertificateException;
 import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import javax.net.ssl.X509KeyManager;
 import javax.net.ssl.X509TrustManager;
 import javax.security.auth.x500.X500Principal;
+import javax.servlet.http.HttpServletRequest;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.common.ClientX509Util;
 import org.apache.zookeeper.common.X509Exception;
@@ -53,6 +58,7 @@ import org.slf4j.LoggerFactory;
  */
 public class X509AuthenticationProvider implements AuthenticationProvider {
 
+    public static final String X509_CERTIFICATE_ATTRIBUTE_NAME = "javax.servlet.request.X509Certificate";
     static final String ZOOKEEPER_X509AUTHENTICATIONPROVIDER_SUPERUSER = "zookeeper.X509AuthenticationProvider.superUser";
     private static final Logger LOG = LoggerFactory.getLogger(X509AuthenticationProvider.class);
     private final X509TrustManager trustManager;
@@ -143,39 +149,25 @@ public class X509AuthenticationProvider implements AuthenticationProvider {
     public KeeperException.Code handleAuthentication(ServerCnxn cnxn, byte[] authData) {
         X509Certificate[] certChain = (X509Certificate[]) cnxn.getClientCertificateChain();
 
-        if (certChain == null || certChain.length == 0) {
+        final Collection<Id> ids = handleAuthentication(certChain);
+        if (ids.isEmpty()) {
+            LOG.error("Failed to authenticate session 0x{}", Long.toHexString(cnxn.getSessionId()));
             return KeeperException.Code.AUTHFAILED;
         }
 
-        if (trustManager == null) {
-            LOG.error("No trust manager available to authenticate session 0x{}", Long.toHexString(cnxn.getSessionId()));
-            return KeeperException.Code.AUTHFAILED;
+        for (Id id : ids) {
+            cnxn.addAuthInfo(id);
         }
-
-        X509Certificate clientCert = certChain[0];
-
-        try {
-            // Authenticate client certificate
-            trustManager.checkClientTrusted(certChain, clientCert.getPublicKey().getAlgorithm());
-        } catch (CertificateException ce) {
-            LOG.error("Failed to trust certificate for session 0x{}", Long.toHexString(cnxn.getSessionId()), ce);
-            return KeeperException.Code.AUTHFAILED;
-        }
-
-        String clientId = getClientId(clientCert);
-
-        if (clientId.equals(System.getProperty(ZOOKEEPER_X509AUTHENTICATIONPROVIDER_SUPERUSER))) {
-            cnxn.addAuthInfo(new Id("super", clientId));
-            LOG.info("Authenticated Id '{}' as super user", clientId);
-        }
-
-        Id authInfo = new Id(getScheme(), clientId);
-        cnxn.addAuthInfo(authInfo);
-
-        LOG.info("Authenticated Id '{}' for Scheme '{}'", authInfo.getId(), authInfo.getScheme());
         return KeeperException.Code.OK;
     }
 
+    @Override
+    public List<Id> handleAuthentication(HttpServletRequest request, byte[] authData) {
+        final X509Certificate[] certChain =
+                (X509Certificate[]) request.getAttribute(X509_CERTIFICATE_ATTRIBUTE_NAME);
+        return handleAuthentication(certChain);
+    }
+
     /**
      * Determine the string to be used as the remote host session Id for
      * authorization purposes. Associate this client identifier with a
@@ -242,4 +234,36 @@ public class X509AuthenticationProvider implements AuthenticationProvider {
         return keyManager;
     }
 
+    private List<Id> handleAuthentication(final X509Certificate[] certChain) {
+        final List<Id> ids = new ArrayList<>();
+        if (certChain == null || certChain.length == 0) {
+            LOG.warn("No certificate chain available to authenticate");
+            return ids;
+        }
+
+        if (trustManager == null) {
+            LOG.error("No trust manager available to authenticate");
+            return ids;
+        }
+
+        final X509Certificate clientCert = certChain[0];
+        try {
+            // Authenticate client certificate
+            trustManager.checkClientTrusted(certChain, clientCert.getPublicKey().getAlgorithm());
+        } catch (CertificateException ce) {
+            LOG.error("Failed to trust certificate", ce);
+            return ids;
+        }
+
+        final String clientId = getClientId(clientCert);
+        if (clientId.equals(System.getProperty(ZOOKEEPER_X509AUTHENTICATIONPROVIDER_SUPERUSER))) {
+            ids.add(new Id("super", clientId));
+            LOG.info("Authenticated Id '{}' as super user", clientId);
+        }
+
+        final Id id = new Id(getScheme(), clientId);
+        ids.add(id);
+        LOG.info("Authenticated Id '{}' for scheme '{}'", id.getId(), id.getScheme());
+        return Collections.unmodifiableList(ids);
+    }
 }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandAuthTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandAuthTest.java
new file mode 100644
index 000000000..a6f200a10
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandAuthTest.java
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.admin;
+
+import static org.apache.zookeeper.ZooDefs.Ids.OPEN_ACL_UNSAFE;
+import static org.apache.zookeeper.server.admin.Commands.AUTH_INFO_SEPARATOR;
+import static org.apache.zookeeper.server.admin.Commands.ROOT_PATH;
+import static org.apache.zookeeper.server.admin.JettyAdminServerTest.HTTPS_URL_FORMAT;
+import static org.junit.Assert.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.io.File;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.X509KeyManager;
+import javax.net.ssl.X509TrustManager;
+import javax.servlet.http.HttpServletResponse;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.ClientX509Util;
+import org.apache.zookeeper.common.QuorumX509Util;
+import org.apache.zookeeper.common.X509Exception;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.NettyServerCnxnFactory;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
+import org.apache.zookeeper.server.auth.ProviderRegistry;
+import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
+import org.apache.zookeeper.test.ClientBase;
+import org.eclipse.jetty.http.HttpHeader;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class CommandAuthTest extends ZKTestCase {
+    private static final String DIGEST_SCHEMA = "digest";
+    private static final String X509_SCHEMA = "x509";
+    private static final String IP_SCHEMA = "ip";
+    private static final String ROOT_USER = "root";
+    private static final String ROOT_PASSWORD = "root_passwd";
+    private static final String AUTH_TEST_COMMAND_NAME = "authtest";
+    private static final String X509_SUBJECT_PRINCIPAL = "CN=localhost,OU=ZooKeeper,O=Apache,L=Unknown,ST=Unknown,C=Unknown";
+
+    public enum AuthSchema {
+        DIGEST,
+        X509,
+        IP
+    }
+
+    private final int jettyAdminPort = PortAssignment.unique();
+    private final String hostPort = "127.0.0.1:" + PortAssignment.unique();
+    private final ClientX509Util clientX509Util = new ClientX509Util();
+    private final QuorumX509Util quorumX509Util = new QuorumX509Util();
+    private ZooKeeperServer zks;
+    private ServerCnxnFactory cnxnFactory;
+    private JettyAdminServer adminServer;
+    private ZooKeeper zk;
+
+    @TempDir
+    static File dataDir;
+
+    @TempDir
+    static File logDir;
+
+    @BeforeAll
+    public void setup() throws Exception {
+        Commands.registerCommand(new AuthTestCommand(true, ZooDefs.Perms.ALL, ROOT_PATH));
+
+        setupTLS();
+
+        // start ZookeeperServer
+        System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+        zks = new ZooKeeperServer(dataDir, logDir, 3000);
+        final int port = Integer.parseInt(hostPort.split(":")[1]);
+        cnxnFactory = ServerCnxnFactory.createFactory(port, -1);
+        cnxnFactory.startup(zks);
+        assertTrue(ClientBase.waitForServerUp(hostPort, 120000));
+
+        // start AdminServer
+        System.setProperty("zookeeper.admin.enableServer", "true");
+        System.setProperty("zookeeper.admin.serverPort", String.valueOf(jettyAdminPort));
+        adminServer = new JettyAdminServer();
+        adminServer.setZooKeeperServer(zks);
+        adminServer.start();
+    }
+
+    @AfterAll
+    public void tearDown() throws Exception {
+        clearTLS();
+
+        System.clearProperty("zookeeper.4lw.commands.whitelist");
+        System.clearProperty("zookeeper.admin.enableServer");
+        System.clearProperty("zookeeper.admin.serverPort");
+
+        if (adminServer != null) {
+            adminServer.shutdown();
+        }
+
+        if (cnxnFactory != null) {
+            cnxnFactory.shutdown();
+        }
+
+        if (zks != null) {
+            zks.shutdown();
+        }
+    }
+
+    @BeforeEach
+    public void setupEach() throws Exception {
+        zk = ClientBase.createZKClient(hostPort);
+    }
+
+    @AfterEach
+    public void tearDownEach() throws Exception {
+        if (zk != null) {
+            zk.close();
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(AuthSchema.class)
+    public void testAuthCheck_authorized(final AuthSchema authSchema) throws Exception {
+        setupRootACL(authSchema);
+        try {
+            final HttpURLConnection authTestConn = sendAuthTestCommandRequest(authSchema, true);
+            assertEquals(HttpURLConnection.HTTP_OK, authTestConn.getResponseCode());
+        } finally {
+            addAuthInfo(zk, authSchema);
+            resetRootACL(zk);
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(value = AuthSchema.class, names = {"DIGEST"})
+    public void testAuthCheck_notAuthorized(final AuthSchema authSchema) throws Exception {
+        setupRootACL(authSchema);
+        try {
+            final HttpURLConnection authTestConn = sendAuthTestCommandRequest(authSchema, false);
+            assertEquals(HttpURLConnection.HTTP_FORBIDDEN, authTestConn.getResponseCode());
+        } finally {
+            addAuthInfo(zk, authSchema);
+            resetRootACL(zk);
+        }
+    }
+
+    @ParameterizedTest
+    @EnumSource(AuthSchema.class)
+    public void testAuthCheck_noACL(final AuthSchema authSchema) throws Exception {
+        final HttpURLConnection authTestConn = sendAuthTestCommandRequest(authSchema, false);
+        assertEquals(HttpURLConnection.HTTP_OK, authTestConn.getResponseCode());
+    }
+
+    @Test
+    public void testAuthCheck_invalidServerRequiredConfig() {
+        assertThrows("An active server is required for auth check",
+        IllegalArgumentException.class,
+        () -> new AuthTestCommand(false, ZooDefs.Perms.ALL, ROOT_PATH));
+    }
+
+    @Test
+    public void testAuthCheck_noAuthInfo() {
+        testAuthCheck_invalidAuthInfo(null);
+    }
+
+    @Test
+    public void testAuthCheck_noAuthInfoSeparator() {
+        final String invalidAuthInfo =  String.format("%s%s%s:%s", DIGEST_SCHEMA, "", ROOT_USER, ROOT_PASSWORD);
+        testAuthCheck_invalidAuthInfo(invalidAuthInfo);
+    }
+
+    @Test
+    public void testAuthCheck_invalidAuthInfoSeparator() {
+        final String invalidAuthInfo =  String.format("%s%s%s:%s", DIGEST_SCHEMA, ":", ROOT_USER, ROOT_PASSWORD);
+        testAuthCheck_invalidAuthInfo(invalidAuthInfo);
+    }
+
+    @Test
+    public void testAuthCheck_invalidAuthSchema() {
+        final String invalidAuthInfo =  String.format("%s%s%s:%s", "InvalidAuthSchema", AUTH_INFO_SEPARATOR, ROOT_USER, ROOT_PASSWORD);
+        testAuthCheck_invalidAuthInfo(invalidAuthInfo);
+    }
+
+    @Test
+    public void testAuthCheck_authProviderNotFound() {
+        final String invalidAuthInfo =  String.format("%s%s%s:%s", "sasl", AUTH_INFO_SEPARATOR, ROOT_USER, ROOT_PASSWORD);
+        testAuthCheck_invalidAuthInfo(invalidAuthInfo);
+    }
+
+    private void testAuthCheck_invalidAuthInfo(final String invalidAuthInfo) {
+        final CommandResponse commandResponse = Commands.runGetCommand(AUTH_TEST_COMMAND_NAME, zks, new HashMap<>(), invalidAuthInfo, null);
+        assertEquals(HttpServletResponse.SC_UNAUTHORIZED, commandResponse.getStatusCode());
+    }
+
+    private static class AuthTestCommand extends GetCommand {
+        public AuthTestCommand(final boolean serverRequired, final int perm, final String path) {
+            super(Arrays.asList(AUTH_TEST_COMMAND_NAME, "at"), serverRequired, new AuthRequest(perm, path));
+        }
+
+        @Override
+        public CommandResponse runGet(ZooKeeperServer zkServer, Map<String, String> kwargs) {
+            return initializeResponse();
+        }
+    }
+
+    private void setupTLS() throws Exception {
+        System.setProperty("zookeeper.authProvider.x509", "org.apache.zookeeper.server.auth.X509AuthenticationProvider");
+        String testDataPath = System.getProperty("test.data.dir", "src/test/resources/data");
+
+        System.setProperty(clientX509Util.getSslKeystoreLocationProperty(), testDataPath + "/ssl/testKeyStore.jks");
+        System.setProperty(clientX509Util.getSslKeystorePasswdProperty(), "testpass");
+        System.setProperty(clientX509Util.getSslTruststoreLocationProperty(), testDataPath + "/ssl/testTrustStore.jks");
+        System.setProperty(clientX509Util.getSslTruststorePasswdProperty(), "testpass");
+
+        // client
+        System.setProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET, "org.apache.zookeeper.ClientCnxnSocketNetty");
+        System.setProperty(ZKClientConfig.SECURE_CLIENT, "true");
+
+        // server
+        System.setProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY, "org.apache.zookeeper.server.NettyServerCnxnFactory");
+        System.setProperty(NettyServerCnxnFactory.PORT_UNIFICATION_KEY, Boolean.TRUE.toString());
+
+        // admin server
+        System.setProperty(quorumX509Util.getSslKeystoreLocationProperty(), testDataPath + "/ssl/testKeyStore.jks");
+        System.setProperty(quorumX509Util.getSslKeystorePasswdProperty(), "testpass");
+        System.setProperty(quorumX509Util.getSslTruststoreLocationProperty(), testDataPath + "/ssl/testTrustStore.jks");
+        System.setProperty(quorumX509Util.getSslTruststorePasswdProperty(), "testpass");
+        System.setProperty("zookeeper.admin.forceHttps", "true");
+        System.setProperty("zookeeper.admin.needClientAuth", "true");
+
+        // create SSLContext
+        final SSLContext sslContext = SSLContext.getInstance(ClientX509Util.DEFAULT_PROTOCOL);
+        final X509AuthenticationProvider authProvider = (X509AuthenticationProvider) ProviderRegistry.getProvider("x509");
+        if (authProvider == null) {
+            throw new X509Exception.SSLContextException("Could not create SSLContext with x509 auth provider");
+        }
+        sslContext.init(new X509KeyManager[]{authProvider.getKeyManager()}, new X509TrustManager[]{authProvider.getTrustManager()}, null);
+
+        // set SSLSocketFactory
+        HttpsURLConnection.setDefaultSSLSocketFactory(sslContext.getSocketFactory());
+    }
+
+    public void clearTLS() {
+        System.clearProperty("zookeeper.authProvider.x509");
+
+        System.clearProperty(clientX509Util.getSslKeystoreLocationProperty());
+        System.clearProperty(clientX509Util.getSslKeystorePasswdProperty());
+        System.clearProperty(clientX509Util.getSslTruststoreLocationProperty());
+        System.clearProperty(clientX509Util.getSslTruststorePasswdProperty());
+
+        // client side
+        System.clearProperty(ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET);
+        System.clearProperty(ZKClientConfig.SECURE_CLIENT);
+
+        // server side
+        System.clearProperty(ServerCnxnFactory.ZOOKEEPER_SERVER_CNXN_FACTORY);
+        System.clearProperty(NettyServerCnxnFactory.PORT_UNIFICATION_KEY);
+
+        // admin server
+        System.clearProperty(quorumX509Util.getSslKeystoreLocationProperty());
+        System.clearProperty(quorumX509Util.getSslKeystorePasswdProperty());
+        System.clearProperty(quorumX509Util.getSslTruststoreLocationProperty());
+        System.clearProperty(quorumX509Util.getSslTruststorePasswdProperty());
+        System.clearProperty("zookeeper.admin.forceHttps");
+        System.clearProperty("zookeeper.admin.needClientAuth");
+    }
+
+    private void setupRootACL(final AuthSchema authSchema) throws Exception {
+        switch (authSchema) {
+            case DIGEST:
+                setupRootACLForDigest(zk);
+                break;
+            case X509:
+                setupRootACLForX509(zk);
+                break;
+            case IP:
+                setupRootACLForIP(zk);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown auth schema");
+        }
+    }
+
+    private HttpURLConnection sendAuthTestCommandRequest(final AuthSchema authSchema, final boolean validAuthInfo) throws Exception  {
+        final URL authTestURL = new URL(String.format(HTTPS_URL_FORMAT + "/" + AUTH_TEST_COMMAND_NAME, jettyAdminPort));
+        final HttpURLConnection authTestConn = (HttpURLConnection) authTestURL.openConnection();
+        addAuthHeader(authTestConn, authSchema, validAuthInfo);
+        authTestConn.setRequestMethod("GET");
+        return authTestConn;
+    }
+
+    private void addAuthInfo(final ZooKeeper zk, final AuthSchema authSchema) {
+        switch (authSchema) {
+            case DIGEST:
+                addAuthInfoForDigest(zk);
+                break;
+            case X509:
+                addAuthInfoForX509(zk);
+                break;
+            case IP:
+                addAuthInfoForIP(zk);
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown auth schema");
+        }
+    }
+
+    public static void resetRootACL(final ZooKeeper zk) throws Exception {
+        zk.setACL(Commands.ROOT_PATH, OPEN_ACL_UNSAFE, -1);
+    }
+
+    public static void setupRootACLForDigest(final ZooKeeper zk) throws Exception  {
+        final String idPassword = String.format("%s:%s", ROOT_USER, ROOT_PASSWORD);
+        final String digest = DigestAuthenticationProvider.generateDigest(idPassword);
+
+        final ACL acl = new ACL(ZooDefs.Perms.ALL, new Id(DIGEST_SCHEMA, digest));
+        zk.setACL(Commands.ROOT_PATH, Collections.singletonList(acl), -1);
+    }
+
+    private static void setupRootACLForX509(final ZooKeeper zk) throws Exception  {
+        final ACL acl = new ACL(ZooDefs.Perms.ALL, new Id(X509_SCHEMA, X509_SUBJECT_PRINCIPAL));
+        zk.setACL(Commands.ROOT_PATH, Collections.singletonList(acl), -1);
+    }
+
+    private static void setupRootACLForIP(final ZooKeeper zk) throws Exception  {
+        final ACL acl = new ACL(ZooDefs.Perms.ALL, new Id(IP_SCHEMA, "127.0.0.1"));
+        zk.setACL(Commands.ROOT_PATH, Collections.singletonList(acl), -1);
+    }
+
+    public static void addAuthInfoForDigest(final ZooKeeper zk) {
+        final String idPassword = String.format("%s:%s", ROOT_USER, ROOT_PASSWORD);
+        zk.addAuthInfo(DIGEST_SCHEMA, idPassword.getBytes(StandardCharsets.UTF_8));
+    }
+
+    public static void addAuthInfoForX509(final ZooKeeper zk) {
+        zk.addAuthInfo(X509_SCHEMA, X509_SUBJECT_PRINCIPAL.getBytes(StandardCharsets.UTF_8));
+    }
+
+    private void addAuthInfoForIP(final ZooKeeper zk) {
+        zk.addAuthInfo(IP_SCHEMA, "127.0.0.1".getBytes(StandardCharsets.UTF_8));
+    }
+
+    public static void addAuthHeader(final HttpURLConnection conn, final AuthSchema authSchema, final boolean validAuthInfo) {
+        String authInfo;
+        switch (authSchema) {
+            case DIGEST:
+                authInfo = validAuthInfo ? buildAuthorizationForDigest() : buildInvalidAuthorizationForDigest();
+                break;
+            case X509:
+                authInfo = buildAuthorizationForX509();
+                break;
+            case IP:
+                authInfo = buildAuthorizationForIP();
+                break;
+            default:
+                throw new IllegalArgumentException("Unknown auth schema");
+        }
+        conn.setRequestProperty(HttpHeader.AUTHORIZATION.asString(), authInfo);
+    }
+
+    public static String buildAuthorizationForDigest() {
+        return  String.format("%s%s%s:%s", DIGEST_SCHEMA, Commands.AUTH_INFO_SEPARATOR, ROOT_USER, ROOT_PASSWORD);
+    }
+
+    private static String buildInvalidAuthorizationForDigest() {
+        return  String.format("%s%s%s:%s", DIGEST_SCHEMA, Commands.AUTH_INFO_SEPARATOR, "InvalidUser", "InvalidPassword");
+    }
+
+    private static String buildAuthorizationForX509() {
+        return  String.format("%s%s", X509_SCHEMA, Commands.AUTH_INFO_SEPARATOR);
+    }
+
+    private static String buildAuthorizationForIP() {
+        return  String.format("%s%s", IP_SCHEMA, Commands.AUTH_INFO_SEPARATOR);
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
index 85c4a78e9..93c318787 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/CommandsTest.java
@@ -61,6 +61,8 @@ public class CommandsTest extends ClientBase {
      *            - keyword arguments to the command
      * @param inputStream
      *            - InputStream to the command
+     * @param authInfo
+     *            - authInfo for the command
      * @param expectedHeaders
      *            - expected HTTP response headers
      * @param expectedStatusCode
@@ -71,11 +73,12 @@ public class CommandsTest extends ClientBase {
      * @throws InterruptedException
      */
     private void testCommand(String cmdName, Map<String, String> kwargs, InputStream inputStream,
+                             String authInfo,
                              Map<String, String> expectedHeaders, int expectedStatusCode,
                              Field... fields) throws IOException, InterruptedException {
         ZooKeeperServer zks = serverFactory.getZooKeeperServer();
         final CommandResponse commandResponse = inputStream == null
-        ? Commands.runGetCommand(cmdName, zks, kwargs) : Commands.runPostCommand(cmdName, zks, inputStream);
+        ? Commands.runGetCommand(cmdName, zks, kwargs, authInfo, null) : Commands.runPostCommand(cmdName, zks, inputStream, authInfo, null);
         assertNotNull(commandResponse);
         assertEquals(expectedStatusCode, commandResponse.getStatusCode());
         try (final InputStream responseStream = commandResponse.getInputStream()) {
@@ -107,7 +110,7 @@ public class CommandsTest extends ClientBase {
     }
 
     public void testCommand(String cmdName, Field... fields) throws IOException, InterruptedException {
-        testCommand(cmdName, new HashMap<>(), null, new HashMap<>(), HttpServletResponse.SC_OK, fields);
+        testCommand(cmdName, new HashMap<>(), null, null, new HashMap<>(), HttpServletResponse.SC_OK, fields);
     }
 
     private static class Field {
@@ -232,7 +235,8 @@ public class CommandsTest extends ClientBase {
         try (final InputStream inputStream = new ByteArrayInputStream("Invalid snapshot data".getBytes())){
             final Map<String, String> kwargs = new HashMap<>();
             final Map<String, String> expectedHeaders = new HashMap<>();
-            testCommand("restore", kwargs, inputStream, expectedHeaders, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
+            final String authInfo = CommandAuthTest.buildAuthorizationForDigest();
+            testCommand("restore", kwargs, inputStream, authInfo, expectedHeaders, HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
         } finally {
             clearForRestoreCommand();
         }
@@ -243,7 +247,9 @@ public class CommandsTest extends ClientBase {
         setupForRestoreCommand();
         final ZooKeeperServer zks = serverFactory.getZooKeeperServer();
         try {
-            final CommandResponse commandResponse = Commands.runPostCommand("restore", zks, null);
+
+            final String authInfo = CommandAuthTest.buildAuthorizationForDigest();
+            final CommandResponse commandResponse = Commands.runPostCommand("restore", zks, null, authInfo, null);
             assertNotNull(commandResponse);
             assertEquals(HttpServletResponse.SC_BAD_REQUEST, commandResponse.getStatusCode());
         } finally {
@@ -273,7 +279,7 @@ public class CommandsTest extends ClientBase {
     public void testSetTraceMask() throws IOException, InterruptedException {
         Map<String, String> kwargs = new HashMap<>();
         kwargs.put("traceMask", "1");
-        testCommand("set_trace_mask", kwargs, null, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class));
+        testCommand("set_trace_mask", kwargs, null, null, new HashMap<>(), HttpServletResponse.SC_OK, new Field("tracemask", Long.class));
     }
 
     @Test
@@ -360,10 +366,11 @@ public class CommandsTest extends ClientBase {
         try {
             final Map<String, String> kwargs = new HashMap<>();
             kwargs.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming));
+            final String autInfo = CommandAuthTest.buildAuthorizationForDigest();
             final Map<String, String> expectedHeaders = new HashMap<>();
             expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID, "0x0");
             expectedHeaders.put(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE, "478");
-            testCommand("snapshot", kwargs, null, expectedHeaders, HttpServletResponse.SC_OK);
+            testCommand("snapshot", kwargs, null, autInfo, expectedHeaders, HttpServletResponse.SC_OK);
         } finally {
             System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
             System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/RestoreQuorumTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/RestoreQuorumTest.java
new file mode 100644
index 000000000..38ca7f897
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/RestoreQuorumTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.admin;
+
+import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.ADMIN_RATE_LIMITER_INTERVAL;
+import static org.apache.zookeeper.server.admin.Commands.RestoreCommand.ADMIN_RESTORE_ENABLED;
+import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
+import static org.apache.zookeeper.server.admin.SnapshotAndRestoreCommandTest.performRestoreAndValidate;
+import static org.apache.zookeeper.server.admin.SnapshotAndRestoreCommandTest.takeSnapshotAndValidate;
+import java.io.File;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class RestoreQuorumTest extends QuorumPeerTestBase {
+    @Test
+    public void testRestoreAfterQuorumLost() throws Exception {
+        setupAdminServerProperties();
+
+        int SERVER_COUNT = 3;
+        final int NODE_COUNT = 10;
+        final String PATH = "/testRestoreAfterQuorumLost";
+
+        try {
+            // start up servers
+            servers = LaunchServers(SERVER_COUNT);
+            int leaderId = servers.findLeader();
+
+            // create data
+            servers.zk[leaderId].create(PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            for (int i = 0; i < NODE_COUNT; i++) {
+                servers.zk[leaderId].create(PATH + "/" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+
+            // take snapshot
+            final File snapshotFile = takeSnapshotAndValidate(servers.adminPorts[leaderId], ClientBase.testBaseDir);
+
+            // create more data
+            for (int i = NODE_COUNT; i < NODE_COUNT * 2; i++) {
+                servers.zk[leaderId].create(PATH + "/" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+            }
+
+            // shutdown all servers to simulate quorum lost
+            servers.shutDownAllServers();
+            waitForAll(servers, ZooKeeper.States.CONNECTING);
+
+            // restart the servers
+            for (int i = 0; i < SERVER_COUNT; i++) {
+                System.setProperty("zookeeper.admin.serverPort", String.valueOf(servers.adminPorts[i]));
+                servers.mt[i].start();
+                servers.restartClient(i, this);
+            }
+            waitForAll(servers, ZooKeeper.States.CONNECTED);
+
+            // restore servers
+            for (int i = 0; i < SERVER_COUNT; i++) {
+                performRestoreAndValidate(servers.adminPorts[i], snapshotFile);
+            }
+
+            // validate all servers are restored
+            for (int i = 0; i < SERVER_COUNT; i++) {
+                servers.restartClient(i, this);
+                Assertions.assertEquals(NODE_COUNT, servers.zk[i].getAllChildrenNumber(PATH));
+            }
+
+            // create more data after restore
+            for (int i = NODE_COUNT * 2; i < NODE_COUNT * 3; i++) {
+                servers.zk[leaderId].create(PATH + "/" + i, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
+            }
+
+            // validate all servers have expected data
+            for (int i = 0; i < SERVER_COUNT; i++) {
+                Assertions.assertEquals(NODE_COUNT * 2, servers.zk[i].getAllChildrenNumber(PATH));
+            }
+        } finally {
+            clearAdminServerProperties();
+        }
+    }
+
+    private void setupAdminServerProperties() {
+        System.setProperty("zookeeper.admin.enableServer", "true");
+        System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0");
+        System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
+        System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
+        System.setProperty(ADMIN_RESTORE_ENABLED, "true");
+    }
+
+    private void clearAdminServerProperties() {
+        System.clearProperty("zookeeper.admin.enableServer");
+        System.clearProperty("zookeeper.admin.serverPort");
+        System.clearProperty(ADMIN_RATE_LIMITER_INTERVAL);
+        System.clearProperty(ADMIN_SNAPSHOT_ENABLED);
+        System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
+        System.clearProperty(ADMIN_RESTORE_ENABLED);
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java
index f7d357e72..a2f31fe86 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/admin/SnapshotAndRestoreCommandTest.java
@@ -19,6 +19,9 @@
 package org.apache.zookeeper.server.admin;
 
 import static org.apache.zookeeper.server.ZooKeeperServer.ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED;
+import static org.apache.zookeeper.server.admin.CommandAuthTest.addAuthInfoForDigest;
+import static org.apache.zookeeper.server.admin.CommandAuthTest.resetRootACL;
+import static org.apache.zookeeper.server.admin.CommandAuthTest.setupRootACLForDigest;
 import static org.apache.zookeeper.server.admin.Commands.ADMIN_RATE_LIMITER_INTERVAL;
 import static org.apache.zookeeper.server.admin.Commands.RestoreCommand.ADMIN_RESTORE_ENABLED;
 import static org.apache.zookeeper.server.admin.Commands.SnapshotCommand.ADMIN_SNAPSHOT_ENABLED;
@@ -101,7 +104,8 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
 
         // start AdminServer
         System.setProperty("zookeeper.admin.enableServer", "true");
-        System.setProperty("zookeeper.admin.serverPort", "" + jettyAdminPort);
+        System.setProperty("zookeeper.admin.serverPort", String.valueOf(jettyAdminPort));
+
         System.setProperty(ADMIN_RATE_LIMITER_INTERVAL, "0");
         System.setProperty(ADMIN_SNAPSHOT_ENABLED, "true");
         System.setProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED, "true");
@@ -111,8 +115,16 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
         adminServer.setZooKeeperServer(zks);
         adminServer.start();
 
-        // create Zookeeper client and test data
+        // create Zookeeper client
         zk = ClientBase.createZKClient(hostPort);
+
+        // setup root ACL
+        setupRootACLForDigest(zk);
+
+        // add auth
+        addAuthInfoForDigest(zk);
+
+        // create test data
         createData(zk, SNAPSHOT_TEST_PATH, NODE_COUNT);
     }
 
@@ -126,6 +138,8 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
         System.clearProperty(ZOOKEEPER_SERIALIZE_LAST_PROCESSED_ZXID_ENABLED);
         System.clearProperty(ADMIN_RESTORE_ENABLED);
 
+        resetRootACL(zk);
+
         if (zk != null) {
             zk.close();
         }
@@ -148,16 +162,17 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
         ServerMetrics.getMetrics().resetAll();
 
         // take snapshot with streaming and validate
-        final File snapshotFile = takeSnapshotAndValidate();
+        final File snapshotFile = takeSnapshotAndValidate(jettyAdminPort, dataDir);
 
         // validate snapshot metrics
         validateSnapshotMetrics();
 
         // restore from snapshot and validate
-        performRestoreAndValidate(snapshotFile);
+        performRestoreAndValidate(jettyAdminPort, snapshotFile);
 
         // validate creating data after restore
         try (final ZooKeeper zk = ClientBase.createZKClient(hostPort)) {
+            addAuthInfoForDigest(zk);
             createData(zk, SNAPSHOT_TEST_PATH, NODE_COUNT + 1);
             assertEquals(NODE_COUNT + NODE_COUNT + 1, zk.getAllChildrenNumber(SNAPSHOT_TEST_PATH));
         }
@@ -173,7 +188,7 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
         final String restoreTestPath = "/restore_test";
 
         // take snapshot
-        final File snapshotFile = takeSnapshotAndValidate();
+        final File snapshotFile = takeSnapshotAndValidate(jettyAdminPort, dataDir);
 
         final ExecutorService service = Executors.newFixedThreadPool(threadCount);
         final CountDownLatch latch = new CountDownLatch(threadCount);
@@ -196,7 +211,7 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
         // thread 2 performs restore operation
         service.submit(() -> {
             try {
-                performRestoreAndValidate(snapshotFile);
+                performRestoreAndValidate(jettyAdminPort, snapshotFile);
                 restoreSucceeded.set(true);
             } catch (final Exception e) {
                 LOG.error(e.getMessage());
@@ -218,18 +233,18 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
     @Test
     public void testRestores() throws Exception {
         // take snapshot
-        final File snapshotFile = takeSnapshotAndValidate();
+        final File snapshotFile = takeSnapshotAndValidate(jettyAdminPort, dataDir);
 
         // perform restores
         for (int i = 0; i < 3; i++) {
-            performRestoreAndValidate(snapshotFile);
+            performRestoreAndValidate(jettyAdminPort, snapshotFile);
         }
     }
 
     @Test
     public void testSnapshotCommand_nonStreaming() throws Exception {
         // take snapshot without streaming
-        final HttpURLConnection snapshotConn = sendSnapshotRequest(false);
+        final HttpURLConnection snapshotConn = sendSnapshotRequest(false, jettyAdminPort);
 
         // validate snapshot response
         assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode());
@@ -242,7 +257,7 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
         System.setProperty(ADMIN_SNAPSHOT_ENABLED, "false");
         try {
             // take snapshot
-            final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+            final HttpURLConnection snapshotConn = sendSnapshotRequest(true, jettyAdminPort);
 
             // validate snapshot response
             assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, snapshotConn.getResponseCode());
@@ -256,7 +271,7 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
         ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
         try {
             // take snapshot
-            final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+            final HttpURLConnection snapshotConn = sendSnapshotRequest(true, jettyAdminPort);
 
             // validate snapshot response
             assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, snapshotConn.getResponseCode());
@@ -269,7 +284,7 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
     public void testRestoreCommand_disabled() throws Exception {
         System.setProperty(ADMIN_RESTORE_ENABLED, "false");
         try {
-            final HttpURLConnection restoreConn = sendRestoreRequest();
+            final HttpURLConnection restoreConn = sendRestoreRequest(jettyAdminPort);
             assertEquals(HttpServletResponse.SC_SERVICE_UNAVAILABLE, restoreConn.getResponseCode());
         } finally {
             System.setProperty(ADMIN_RESTORE_ENABLED, "true");
@@ -280,7 +295,7 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
     public void testRestoreCommand_serializeLastZxidDisabled() throws Exception {
         ZooKeeperServer.setSerializeLastProcessedZxidEnabled(false);
         try {
-            final HttpURLConnection restoreConn = sendRestoreRequest();
+            final HttpURLConnection restoreConn = sendRestoreRequest(jettyAdminPort);
             assertEquals(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, restoreConn.getResponseCode());
         } finally {
             ZooKeeperServer.setSerializeLastProcessedZxidEnabled(true);
@@ -289,7 +304,7 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
 
     @Test
     public void testRestoreCommand_invalidSnapshotData() throws Exception {
-        final HttpURLConnection restoreConn = sendRestoreRequest();
+        final HttpURLConnection restoreConn = sendRestoreRequest(jettyAdminPort);
         try (final InputStream inputStream = new ByteArrayInputStream("Invalid snapshot data".getBytes());
              final OutputStream outputStream = restoreConn.getOutputStream()) {
             IOUtils.copyBytes(inputStream, outputStream, 1024, true);
@@ -309,25 +324,27 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
         }
     }
 
-    private HttpURLConnection sendSnapshotRequest(final boolean streaming) throws Exception  {
+    private static HttpURLConnection sendSnapshotRequest(final boolean streaming, final int jettyAdminPort) throws Exception  {
         final String queryParamsStr = buildQueryStringForSnapshotCommand(streaming);
         final URL snapshotURL = new URL(String.format(URL_FORMAT + "/snapshot", jettyAdminPort) + "?" + queryParamsStr);
         final HttpURLConnection snapshotConn = (HttpURLConnection) snapshotURL.openConnection();
+        CommandAuthTest.addAuthHeader(snapshotConn, CommandAuthTest.AuthSchema.DIGEST, true);
         snapshotConn.setRequestMethod("GET");
 
         return snapshotConn;
     }
 
-    private HttpURLConnection sendRestoreRequest() throws Exception  {
+    private static HttpURLConnection sendRestoreRequest(final int jettyAdminPort) throws Exception  {
         final URL restoreURL = new URL(String.format(URL_FORMAT + "/restore", jettyAdminPort));
         final HttpURLConnection restoreConn = (HttpURLConnection) restoreURL.openConnection();
         restoreConn.setDoOutput(true);
+        CommandAuthTest.addAuthHeader(restoreConn, CommandAuthTest.AuthSchema.DIGEST, true);
         restoreConn.setRequestMethod("POST");
 
         return restoreConn;
     }
 
-    private String buildQueryStringForSnapshotCommand(final boolean streaming) throws Exception {
+    private static String buildQueryStringForSnapshotCommand(final boolean streaming) throws Exception {
         final Map<String, String> parameters = new HashMap<>();
         parameters.put(REQUEST_QUERY_PARAM_STREAMING, String.valueOf(streaming));
         return getParamsString(parameters);
@@ -350,7 +367,7 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
                 : resultString;
     }
 
-    private void validateResponseHeaders(final HttpURLConnection conn) {
+    private static void validateResponseHeaders(final HttpURLConnection conn) {
         LOG.info("Header:{}, Value:{}",
                 Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID,
                 conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_LAST_ZXID));
@@ -363,7 +380,7 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
         assertTrue(Integer.parseInt(conn.getHeaderField(Commands.SnapshotCommand.RESPONSE_HEADER_SNAPSHOT_SIZE)) > 0);
     }
 
-    private void displayResponsePayload(final HttpURLConnection conn) throws IOException {
+    private static void displayResponsePayload(final HttpURLConnection conn) throws IOException {
         final StringBuilder sb = new StringBuilder();
         try (final BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()))) {
             String inputLine;
@@ -388,9 +405,9 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
         assertTrue((Double) metrics.get("avg_restore_time") > 0.0);
     }
 
-    private File takeSnapshotAndValidate() throws Exception {
+    public static  File takeSnapshotAndValidate(final int jettyAdminPort, final File dataDir) throws Exception {
         // take snapshot with streaming
-        final HttpURLConnection snapshotConn = sendSnapshotRequest(true);
+        final HttpURLConnection snapshotConn = sendSnapshotRequest(true, jettyAdminPort);
 
         // validate snapshot response
         assertEquals(HttpURLConnection.HTTP_OK, snapshotConn.getResponseCode());
@@ -405,9 +422,9 @@ public class SnapshotAndRestoreCommandTest extends ZKTestCase {
         return snapshotFile;
     }
 
-    private void performRestoreAndValidate(final File snapshotFile) throws Exception {
+    public static void performRestoreAndValidate(final int jettyAdminPort, final File snapshotFile) throws Exception {
         // perform restore
-        final HttpURLConnection restoreConn = sendRestoreRequest();
+        final HttpURLConnection restoreConn = sendRestoreRequest(jettyAdminPort);
         try (final CheckedInputStream is = SnapStream.getInputStream(snapshotFile);
              final OutputStream outputStream = restoreConn.getOutputStream()) {
             IOUtils.copyBytes(is, outputStream, 1024, true);
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
index 8249cd222..2a7b69258 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerTestBase.java
@@ -402,11 +402,12 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
     }
 
     // This class holds the servers and clients for those servers
-    protected static class Servers {
+    public static class Servers {
 
-        MainThread[] mt;
-        ZooKeeper[] zk;
+        public MainThread[] mt;
+        public ZooKeeper[] zk;
         public int[] clientPorts;
+        public int[] adminPorts;
 
         public void shutDownAllServers() throws InterruptedException {
             for (MainThread t : mt) {
@@ -415,9 +416,12 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
         }
 
         public void restartAllServersAndClients(Watcher watcher) throws IOException, InterruptedException {
+            int index = 0;
             for (MainThread t : mt) {
                 if (!t.isAlive()) {
+                    System.setProperty("zookeeper.admin.serverPort", String.valueOf(adminPorts[index]));
                     t.start();
+                    index++;
                 }
             }
             for (int i = 0; i < zk.length; i++) {
@@ -510,6 +514,12 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
                     i, PortAssignment.unique(), PortAssignment.unique(), role,
                     svrs.clientPorts[i]));
         }
+
+        svrs.adminPorts = new int[SERVER_COUNT];
+        for (int i = 0; i < SERVER_COUNT; i++) {
+            svrs.adminPorts[i] = PortAssignment.unique();
+        }
+
         String quorumCfgSection = sb.toString();
 
         svrs.mt = new MainThread[SERVER_COUNT];
@@ -520,6 +530,7 @@ public class QuorumPeerTestBase extends ZKTestCase implements Watcher {
             } else {
                 svrs.mt[i] = new MainThread(i, svrs.clientPorts[i], quorumCfgSection, otherConfigs);
             }
+            System.setProperty("zookeeper.admin.serverPort", String.valueOf(svrs.adminPorts[i]));
             svrs.mt[i].start();
             svrs.restartClient(i, this);
         }
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/IPAuthTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/IPAuthTest.java
new file mode 100644
index 000000000..45ce447fe
--- /dev/null
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/IPAuthTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import java.util.Arrays;
+import java.util.List;
+import javax.servlet.http.HttpServletRequest;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.server.auth.IPAuthenticationProvider;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+public class IPAuthTest extends ZKTestCase {
+    @Test
+    public void testHandleAuthentication_Forwarded() {
+        final IPAuthenticationProvider provider = new IPAuthenticationProvider();
+
+        final HttpServletRequest mockRequest = mock(HttpServletRequest.class);
+        final String forwardedForHeader = "fc00:0:0:0:0:0:0:4, 192.168.0.6, 10.0.0.8, 172.16.0.9";
+        Mockito.doReturn(forwardedForHeader).when(mockRequest).getHeader(IPAuthenticationProvider.X_FORWARDED_FOR_HEADER_NAME);
+        Mockito.doReturn("192.168.0.5").when(mockRequest).getRemoteAddr();
+
+        // validate it returns the leftmost IP from the X-Forwarded-For header
+        final List<Id> expectedIds = Arrays.asList(new Id(provider.getScheme(), "fc00:0:0:0:0:0:0:4"));
+        assertEquals(expectedIds, provider.handleAuthentication(mockRequest, null));
+    }
+
+    @Test
+    public void testHandleAuthentication_NoForwarded() {
+        final IPAuthenticationProvider provider = new IPAuthenticationProvider();
+
+        final HttpServletRequest mockRequest = mock(HttpServletRequest.class);
+        Mockito.doReturn(null).when(mockRequest).getHeader(IPAuthenticationProvider.X_FORWARDED_FOR_HEADER_NAME);
+        Mockito.doReturn("192.168.0.6").when(mockRequest).getRemoteAddr();
+
+        // validate it returns the remote address
+        final List<Id> expectedIds = Arrays.asList(new Id(provider.getScheme(), "192.168.0.6"));
+        assertEquals(expectedIds, provider.handleAuthentication(mockRequest, null));
+    }
+}
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
index edde2b5da..f6ad52ce5 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/ObserverMasterTest.java
@@ -293,7 +293,7 @@ public class ObserverMasterTest extends ObserverMasterTestBase {
 
         // test stats collection
         final Map<String, String> emptyMap = Collections.emptyMap();
-        Map<String, Object> stats = Commands.runGetCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap).toMap();
+        Map<String, Object> stats = Commands.runGetCommand("mntr", q3.getQuorumPeer().getActiveServer(), emptyMap, null, null).toMap();
         assertTrue(stats.containsKey("observer_master_id"), "observer not emitting observer_master_id");
 
         // check the stats for the first peer
diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/test/X509AuthTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/test/X509AuthTest.java
index bb04a9fdd..65925ec89 100644
--- a/zookeeper-server/src/test/java/org/apache/zookeeper/test/X509AuthTest.java
+++ b/zookeeper-server/src/test/java/org/apache/zookeeper/test/X509AuthTest.java
@@ -19,6 +19,8 @@
 package org.apache.zookeeper.test;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 import java.math.BigInteger;
 import java.net.Socket;
 import java.security.InvalidKeyException;
@@ -35,16 +37,20 @@ import java.security.cert.CertificateNotYetValidException;
 import java.security.cert.X509Certificate;
 import java.util.Arrays;
 import java.util.Date;
+import java.util.List;
 import java.util.Set;
 import javax.net.ssl.X509KeyManager;
 import javax.net.ssl.X509TrustManager;
 import javax.security.auth.x500.X500Principal;
+import javax.servlet.http.HttpServletRequest;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.server.MockServerCnxn;
 import org.apache.zookeeper.server.auth.X509AuthenticationProvider;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
 
 public class X509AuthTest extends ZKTestCase {
 
@@ -69,6 +75,8 @@ public class X509AuthTest extends ZKTestCase {
         MockServerCnxn cnxn = new MockServerCnxn();
         cnxn.clientChain = new X509Certificate[]{clientCert};
         assertEquals(KeeperException.Code.OK, provider.handleAuthentication(cnxn, null));
+        final List<Id> ids = Arrays.asList(new Id("x509", "CN=CLIENT"));
+        assertEquals(ids, cnxn.getAuthInfo());
     }
 
     @Test
@@ -77,7 +85,8 @@ public class X509AuthTest extends ZKTestCase {
         MockServerCnxn cnxn = new MockServerCnxn();
         cnxn.clientChain = new X509Certificate[]{superCert};
         assertEquals(KeeperException.Code.OK, provider.handleAuthentication(cnxn, null));
-        assertEquals("super", cnxn.getAuthInfo().get(0).getScheme());
+        final List<Id> ids = Arrays.asList(new Id("super", "CN=SUPER"), new Id("x509", "CN=SUPER"));
+        assertEquals(ids, cnxn.getAuthInfo());
     }
 
     @Test
@@ -88,6 +97,32 @@ public class X509AuthTest extends ZKTestCase {
         assertEquals(KeeperException.Code.AUTHFAILED, provider.handleAuthentication(cnxn, null));
     }
 
+    @Test
+    public void testTrustedAuth_HttpServletRequest() {
+        final X509AuthenticationProvider provider = createProvider(clientCert);
+        final HttpServletRequest mockRequest =  mock(HttpServletRequest.class);
+        Mockito.doReturn(new X509Certificate[]{clientCert}).when(mockRequest).getAttribute(X509AuthenticationProvider.X509_CERTIFICATE_ATTRIBUTE_NAME);
+        final List<Id> ids = Arrays.asList(new Id("x509", "CN=CLIENT"));
+        assertEquals(ids, provider.handleAuthentication(mockRequest, null));
+    }
+
+    @Test
+    public void testSuperAuth_HttpServletRequest() {
+        final X509AuthenticationProvider provider = createProvider(superCert);
+        final HttpServletRequest mockRequest =  mock(HttpServletRequest.class);
+        Mockito.doReturn(new X509Certificate[]{superCert}).when(mockRequest).getAttribute(X509AuthenticationProvider.X509_CERTIFICATE_ATTRIBUTE_NAME);
+        final List<Id> ids = Arrays.asList(new Id("super", "CN=SUPER"), new Id("x509", "CN=SUPER"));
+        assertEquals(ids, provider.handleAuthentication(mockRequest, null));
+    }
+
+    @Test
+    public void testUntrustedAuth_HttpServletRequest() {
+        final X509AuthenticationProvider provider = createProvider(clientCert);
+        final HttpServletRequest mockRequest =  mock(HttpServletRequest.class);
+        Mockito.doReturn(new X509Certificate[]{unknownCert}).when(mockRequest).getAttribute(X509AuthenticationProvider.X509_CERTIFICATE_ATTRIBUTE_NAME);
+        assertTrue(provider.handleAuthentication(mockRequest, null).isEmpty());
+    }
+
     private static class TestPublicKey implements PublicKey {
 
         private static final long serialVersionUID = 1L;