You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by mo...@apache.org on 2017/12/14 21:13:11 UTC

[24/49] knox git commit: KNOX-1129 - Remote Configuration Monitor Should Define The Entries It Monitors If They're Not Yet Defined (Phil Zampino via lmccay)

KNOX-1129 - Remote Configuration Monitor Should Define The Entries It Monitors If They're Not Yet Defined (Phil Zampino via lmccay)

Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/e482e2e9
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/e482e2e9
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/e482e2e9

Branch: refs/heads/KNOX-998-Package_Restructuring
Commit: e482e2e93687d8601f94f8134544ebe1e0a04108
Parents: 828ea38
Author: Larry McCay <lm...@hortonworks.com>
Authored: Mon Dec 4 17:15:57 2017 -0500
Committer: Larry McCay <lm...@hortonworks.com>
Committed: Mon Dec 4 17:16:44 2017 -0500

----------------------------------------------------------------------
 .../apache/hadoop/gateway/GatewayMessages.java  |   8 +
 .../DefaultRemoteConfigurationMonitor.java      |  63 +++++
 ...emoteConfigurationRegistryClientService.java |  20 ++
 .../remote/RemoteConfigurationMessages.java     |   3 +
 .../config/remote/zk/CuratorClientService.java  |  48 +++-
 .../RemoteConfigurationRegistryClient.java      |   6 +
 .../monitor/RemoteConfigurationMonitorTest.java | 260 +++++++++++++++++--
 7 files changed, 377 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/knox/blob/e482e2e9/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java
index d78ef71..ab0ab39 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/GatewayMessages.java
@@ -588,4 +588,12 @@ public interface GatewayMessages {
   @Message( level = MessageLevel.DEBUG, text = "Removed descriptor {0} reference to provider configuration {1}." )
   void removedProviderConfigurationReference(String descriptorName, String providerConfigurationName);
 
+  @Message( level = MessageLevel.WARN,
+            text = "The permissions for the remote configuration registry entry \"{0}\" are such that its content may not be trustworthy." )
+  void suspectWritableRemoteConfigurationEntry(String entryPath);
+
+  @Message( level = MessageLevel.WARN,
+            text = "Correcting the suspect permissions for the remote configuration registry entry \"{0}\"." )
+  void correctingSuspectWritableRemoteConfigurationEntry(String entryPath);
+
 }

http://git-wip-us.apache.org/repos/asf/knox/blob/e482e2e9/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java b/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java
index 03bbf16..af60058 100644
--- a/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java
+++ b/gateway-server/src/main/java/org/apache/hadoop/gateway/topology/monitor/DefaultRemoteConfigurationMonitor.java
@@ -24,9 +24,12 @@ import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegis
 import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClient.EntryListener;
 import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClient;
 import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClientService;
+import org.apache.zookeeper.ZooDefs;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 
@@ -39,6 +42,32 @@ class DefaultRemoteConfigurationMonitor implements RemoteConfigurationMonitor {
 
     private static GatewayMessages log = MessagesFactory.get(GatewayMessages.class);
 
+    // N.B. This is ZooKeeper-specific, and should be abstracted when another registry is supported
+    private static final RemoteConfigurationRegistryClient.EntryACL AUTHENTICATED_USERS_ALL;
+    static {
+        AUTHENTICATED_USERS_ALL = new RemoteConfigurationRegistryClient.EntryACL() {
+            public String getId() {
+                return "";
+            }
+
+            public String getType() {
+                return "auth";
+            }
+
+            public Object getPermissions() {
+                return ZooDefs.Perms.ALL;
+            }
+
+            public boolean canRead() {
+                return true;
+            }
+
+            public boolean canWrite() {
+                return true;
+            }
+        };
+    }
+
     private RemoteConfigurationRegistryClient client = null;
 
     private File providersDir;
@@ -75,6 +104,9 @@ class DefaultRemoteConfigurationMonitor implements RemoteConfigurationMonitor {
         final String monitorSource = client.getAddress();
         log.startingRemoteConfigurationMonitor(monitorSource);
 
+        // Ensure the existence of the expected entries and their associated ACLs
+        ensureEntries();
+
         // Confirm access to the remote provider configs directory znode
         List<String> providerConfigs = client.listChildEntries(NODE_KNOX_PROVIDERS);
         if (providerConfigs == null) {
@@ -105,6 +137,37 @@ class DefaultRemoteConfigurationMonitor implements RemoteConfigurationMonitor {
         client.removeEntryListener(NODE_KNOX_DESCRIPTORS);
     }
 
+    private void ensureEntries() {
+        ensureEntry(NODE_KNOX);
+        ensureEntry(NODE_KNOX_CONFIG);
+        ensureEntry(NODE_KNOX_PROVIDERS);
+        ensureEntry(NODE_KNOX_DESCRIPTORS);
+    }
+
+    private void ensureEntry(String name) {
+        if (!client.entryExists(name)) {
+            client.createEntry(name);
+        } else {
+            // Validate the ACL
+            List<RemoteConfigurationRegistryClient.EntryACL> entryACLs = client.getACL(name);
+            for (RemoteConfigurationRegistryClient.EntryACL entryACL : entryACLs) {
+                // N.B. This is ZooKeeper-specific, and should be abstracted when another registry is supported
+                // For now, check for ZooKeeper world:anyone with ANY permissions (even read-only)
+                if (entryACL.getType().equals("world") && entryACL.getId().equals("anyone")) {
+                    log.suspectWritableRemoteConfigurationEntry(name);
+
+                    // If the client is authenticated, but "anyone" can write the content, then the content may not
+                    // be trustworthy.
+                    if (client.isAuthenticationConfigured()) {
+                        log.correctingSuspectWritableRemoteConfigurationEntry(name);
+
+                        // Replace the existing ACL with one that permits only authenticated users
+                        client.setACL(name, Collections.singletonList(AUTHENTICATED_USERS_ALL));
+                  }
+                }
+            }
+        }
+    }
 
     private static class ConfigDirChildEntryListener implements ChildEntryListener {
         File localDir;

http://git-wip-us.apache.org/repos/asf/knox/blob/e482e2e9/gateway-server/src/test/java/org/apache/hadoop/gateway/service/config/remote/LocalFileSystemRemoteConfigurationRegistryClientService.java
----------------------------------------------------------------------
diff --git a/gateway-server/src/test/java/org/apache/hadoop/gateway/service/config/remote/LocalFileSystemRemoteConfigurationRegistryClientService.java b/gateway-server/src/test/java/org/apache/hadoop/gateway/service/config/remote/LocalFileSystemRemoteConfigurationRegistryClientService.java
index 161c201..0bfc39a 100644
--- a/gateway-server/src/test/java/org/apache/hadoop/gateway/service/config/remote/LocalFileSystemRemoteConfigurationRegistryClientService.java
+++ b/gateway-server/src/test/java/org/apache/hadoop/gateway/service/config/remote/LocalFileSystemRemoteConfigurationRegistryClientService.java
@@ -129,6 +129,16 @@ public class LocalFileSystemRemoteConfigurationRegistryClientService implements
                             public Object getPermissions() {
                                 return collected.get(id).toString();
                             }
+
+                            @Override
+                            public boolean canRead() {
+                                return true;
+                            }
+
+                            @Override
+                            public boolean canWrite() {
+                                return true;
+                            }
                         };
                         result.add(acl);
                     }
@@ -216,6 +226,16 @@ public class LocalFileSystemRemoteConfigurationRegistryClientService implements
             }
 
             @Override
+            public boolean isAuthenticationConfigured() {
+                return false;
+            }
+
+            @Override
+            public void setACL(String path, List<EntryACL> acls) {
+                //
+            }
+
+            @Override
             public void deleteEntry(String path) {
                 File entry = new File(root, path);
                 if (entry.exists()) {

http://git-wip-us.apache.org/repos/asf/knox/blob/e482e2e9/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/RemoteConfigurationMessages.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/RemoteConfigurationMessages.java b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/RemoteConfigurationMessages.java
index 22e622d..7cd1324 100644
--- a/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/RemoteConfigurationMessages.java
+++ b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/RemoteConfigurationMessages.java
@@ -42,5 +42,8 @@ public interface RemoteConfigurationMessages {
     void errorHandlingRemoteConfigACL(final String path,
                                       @StackTrace(level = MessageLevel.DEBUG) Exception e);
 
+    @Message(level = MessageLevel.ERROR, text = "An error occurred setting the ACL for remote configuration {0} : {1}")
+    void errorSettingEntryACL(final String path,
+                              @StackTrace(level = MessageLevel.DEBUG) Exception e);
 
 }

http://git-wip-us.apache.org/repos/asf/knox/blob/e482e2e9/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/CuratorClientService.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/CuratorClientService.java b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/CuratorClientService.java
index 0908252..0000f48 100644
--- a/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/CuratorClientService.java
+++ b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/CuratorClientService.java
@@ -45,6 +45,7 @@ import org.apache.zookeeper.data.Stat;
 
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -115,7 +116,7 @@ class CuratorClientService implements ZooKeeperClientService {
         ACLProvider aclProvider;
         if (config.isSecureRegistry()) {
             configureSasl(config);
-            aclProvider = new SASLOwnerACLProvider(config.getPrincipal());
+            aclProvider = new SASLOwnerACLProvider();
         } else {
             // Clear SASL system property
             System.clearProperty(LOGIN_CONTEXT_NAME_PROPERTY);
@@ -163,6 +164,11 @@ class CuratorClientService implements ZooKeeperClientService {
         }
 
         @Override
+        public boolean isAuthenticationConfigured() {
+            return config.isSecureRegistry();
+        }
+
+        @Override
         public boolean entryExists(String path) {
             Stat s = null;
             try {
@@ -191,6 +197,30 @@ class CuratorClientService implements ZooKeeperClientService {
         }
 
         @Override
+        public void setACL(String path, List<EntryACL> entryACLs) {
+            // Translate the abstract ACLs into ZooKeeper ACLs
+            List<ACL> delegateACLs = new ArrayList<>();
+            for (EntryACL entryACL : entryACLs) {
+                String scheme = entryACL.getType();
+                String id = entryACL.getId();
+                int permissions = 0;
+                if (entryACL.canWrite()) {
+                    permissions = ZooDefs.Perms.ALL;
+                } else if (entryACL.canRead()){
+                    permissions = ZooDefs.Perms.READ;
+                }
+                delegateACLs.add(new ACL(permissions, new Id(scheme, id)));
+            }
+
+            try {
+                // Set the ACLs for the path
+                delegate.setACL().withACL(delegateACLs).forPath(path);
+            } catch (Exception e) {
+                log.errorSettingEntryACL(path, e);
+            }
+        }
+
+        @Override
         public List<String> listChildEntries(String path) {
             List<String> result = null;
             try {
@@ -305,8 +335,8 @@ class CuratorClientService implements ZooKeeperClientService {
 
         private final List<ACL> saslACL;
 
-        private SASLOwnerACLProvider(String principal) {
-            this.saslACL = Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("sasl", principal)));
+        private SASLOwnerACLProvider() {
+            this.saslACL = ZooDefs.Ids.CREATOR_ALL_ACL; // All permissions for any authenticated user
         }
 
         @Override
@@ -396,7 +426,7 @@ class CuratorClientService implements ZooKeeperClientService {
     private static final class ZooKeeperACLAdapter implements RemoteConfigurationRegistryClient.EntryACL {
         private String type;
         private String id;
-        private Object permissions;
+        private int permissions;
 
         ZooKeeperACLAdapter(ACL acl) {
             this.permissions = acl.getPerms();
@@ -418,6 +448,16 @@ class CuratorClientService implements ZooKeeperClientService {
         public Object getPermissions() {
             return permissions;
         }
+
+        @Override
+        public boolean canRead() {
+            return (permissions >= ZooDefs.Perms.READ);
+        }
+
+        @Override
+        public boolean canWrite() {
+            return (permissions >= ZooDefs.Perms.WRITE);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/knox/blob/e482e2e9/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/config/client/RemoteConfigurationRegistryClient.java
----------------------------------------------------------------------
diff --git a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/config/client/RemoteConfigurationRegistryClient.java b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/config/client/RemoteConfigurationRegistryClient.java
index 6fbf410..bfb4518 100644
--- a/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/config/client/RemoteConfigurationRegistryClient.java
+++ b/gateway-spi/src/main/java/org/apache/hadoop/gateway/services/config/client/RemoteConfigurationRegistryClient.java
@@ -22,10 +22,14 @@ public interface RemoteConfigurationRegistryClient {
 
     String getAddress();
 
+    boolean isAuthenticationConfigured();
+
     boolean entryExists(String path);
 
     List<EntryACL> getACL(String path);
 
+    void setACL(String path, List<EntryACL> acls);
+
     List<String> listChildEntries(String path);
 
     String getEntryData(String path);
@@ -69,6 +73,8 @@ public interface RemoteConfigurationRegistryClient {
         String getId();
         String getType();
         Object getPermissions();
+        boolean canRead();
+        boolean canWrite();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/knox/blob/e482e2e9/gateway-test/src/test/java/org/apache/hadoop/gateway/topology/monitor/RemoteConfigurationMonitorTest.java
----------------------------------------------------------------------
diff --git a/gateway-test/src/test/java/org/apache/hadoop/gateway/topology/monitor/RemoteConfigurationMonitorTest.java b/gateway-test/src/test/java/org/apache/hadoop/gateway/topology/monitor/RemoteConfigurationMonitorTest.java
index 14d98a9..dd75028 100644
--- a/gateway-test/src/test/java/org/apache/hadoop/gateway/topology/monitor/RemoteConfigurationMonitorTest.java
+++ b/gateway-test/src/test/java/org/apache/hadoop/gateway/topology/monitor/RemoteConfigurationMonitorTest.java
@@ -33,13 +33,16 @@ import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.easymock.EasyMock;
+import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.FileWriter;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -48,6 +51,7 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -66,9 +70,16 @@ public class RemoteConfigurationMonitorTest {
     private static final String PATH_KNOX_PROVIDERS = PATH_KNOX_CONFIG + "/shared-providers";
     private static final String PATH_KNOX_DESCRIPTORS = PATH_KNOX_CONFIG + "/descriptors";
 
+    private static final String PATH_AUTH_TEST = "/auth_test/child_node";
+
+
+    private static final String ALT_USERNAME = "notyou";
     private static final String ZK_USERNAME = "testsasluser";
     private static final String ZK_PASSWORD = "testsaslpwd";
 
+    private static final ACL ANY_AUTHENTICATED_USER_ALL = new ACL(ZooDefs.Perms.ALL, new Id("auth", ""));
+    private static final ACL SASL_TESTUSER_ALL = new ACL(ZooDefs.Perms.ALL, new Id("sasl", ZK_USERNAME));
+
     private static File testTmp;
     private static File providersDir;
     private static File descriptorsDir;
@@ -80,13 +91,36 @@ public class RemoteConfigurationMonitorTest {
     @BeforeClass
     public static void setupSuite() throws Exception {
         testTmp = TestUtils.createTempDir(RemoteConfigurationMonitorTest.class.getName());
-        File confDir   = TestUtils.createTempDir(testTmp + "/conf");
-        providersDir   = TestUtils.createTempDir(confDir + "/shared-providers");
+        File confDir = TestUtils.createTempDir(testTmp + "/conf");
+        providersDir = TestUtils.createTempDir(confDir + "/shared-providers");
         descriptorsDir = TestUtils.createTempDir(confDir + "/descriptors");
+    }
 
+    @AfterClass
+    public static void tearDownSuite() throws Exception {
+        // Delete the working dir
+        testTmp.delete();
+    }
+
+    @Before
+    public void setupTest() throws Exception {
         configureAndStartZKCluster();
     }
 
+    @After
+    public void tearDownTest() throws Exception {
+        // Clean up the ZK nodes, and close the client
+        if (client != null) {
+            if (client.checkExists().forPath(PATH_KNOX) != null) {
+                client.delete().deletingChildrenIfNeeded().forPath(PATH_KNOX);
+            }
+            client.close();
+        }
+
+        // Shutdown the ZK cluster
+        zkCluster.close();
+    }
+
     /**
      * Create and persist a JAAS configuration file, defining the SASL config for both the ZooKeeper cluster instances
      * and ZooKeeper clients.
@@ -102,7 +136,7 @@ public class RemoteConfigurationMonitorTest {
         fw.write("Server {\n" +
                 "    org.apache.zookeeper.server.auth.DigestLoginModule required\n" +
                 "    user_" + username + " =\"" + password + "\";\n" +
-                "};\n"+
+                "};\n" +
                 "Client {\n" +
                 "    org.apache.zookeeper.server.auth.DigestLoginModule required\n" +
                 "    username=\"" + username + "\"\n" +
@@ -141,42 +175,187 @@ public class RemoteConfigurationMonitorTest {
 
         // Create the client for the test cluster
         client = CuratorFrameworkFactory.builder()
-                .connectString(zkCluster.getConnectString())
-                .retryPolicy(new ExponentialBackoffRetry(100, 3))
-                .build();
+                                        .connectString(zkCluster.getConnectString())
+                                        .retryPolicy(new ExponentialBackoffRetry(100, 3))
+                                        .build();
         assertNotNull(client);
         client.start();
 
-        // Create the knox config paths with an ACL for the sasl user configured for the client
-        List<ACL> acls = new ArrayList<>();
-        acls.add(new ACL(ZooDefs.Perms.ALL, new Id("sasl", ZK_USERNAME)));
+        // Create test config nodes with an ACL for a sasl user that is NOT configured for the test client
+        List<ACL> acls = Arrays.asList(new ACL(ZooDefs.Perms.ALL, new Id("sasl", ALT_USERNAME)),
+                                       new ACL(ZooDefs.Perms.READ, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_AUTH_TEST);
+        assertNotNull("Failed to create node:" + PATH_AUTH_TEST,
+                      client.checkExists().forPath(PATH_AUTH_TEST));
+    }
 
-        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_KNOX_DESCRIPTORS);
-        assertNotNull("Failed to create node:" + PATH_KNOX_DESCRIPTORS,
-                client.checkExists().forPath(PATH_KNOX_DESCRIPTORS));
-        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_KNOX_PROVIDERS);
-        assertNotNull("Failed to create node:" + PATH_KNOX_PROVIDERS,
-                client.checkExists().forPath(PATH_KNOX_PROVIDERS));
+
+    private static void validateKnoxConfigNodeACLs(List<ACL> expectedACLS, List<ACL> actualACLs) throws Exception {
+        assertEquals(expectedACLS.size(), actualACLs.size());
+        int matchedCount = 0;
+        for (ACL expected : expectedACLS) {
+            for (ACL actual : actualACLs) {
+                Id expectedId = expected.getId();
+                Id actualId = actual.getId();
+                if (actualId.getScheme().equals(expectedId.getScheme()) && actualId.getId().equals(expectedId.getId())) {
+                    matchedCount++;
+                    assertEquals(expected.getPerms(), actual.getPerms());
+                    break;
+                }
+            }
+        }
+        assertEquals("ACL mismatch despite being same quantity.", expectedACLS.size(), matchedCount);
     }
 
-    @AfterClass
-    public static void tearDownSuite() throws Exception {
-        // Clean up the ZK nodes, and close the client
-        if (client != null) {
-            client.delete().deletingChildrenIfNeeded().forPath(PATH_KNOX);
-            client.close();
+
+    @Test
+    public void testZooKeeperConfigMonitorSASLNodesExistWithUnacceptableACL() throws Exception {
+        final String configMonitorName = "zkConfigClient";
+        final String alias = "zkPass";
+
+        // Setup the base GatewayConfig mock
+        GatewayConfig gc = EasyMock.createNiceMock(GatewayConfig.class);
+        EasyMock.expect(gc.getGatewayProvidersConfigDir()).andReturn(providersDir.getAbsolutePath()).anyTimes();
+        EasyMock.expect(gc.getGatewayDescriptorsDir()).andReturn(descriptorsDir.getAbsolutePath()).anyTimes();
+        EasyMock.expect(gc.getRemoteRegistryConfigurationNames())
+                .andReturn(Collections.singletonList(configMonitorName))
+                .anyTimes();
+        final String registryConfig =
+                GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE + "=" + ZooKeeperClientService.TYPE + ";" +
+                        GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS + "=" + zkCluster.getConnectString() + ";" +
+                        GatewayConfig.REMOTE_CONFIG_REGISTRY_PRINCIPAL + "=" + ZK_USERNAME + ";" +
+                        GatewayConfig.REMOTE_CONFIG_REGISTRY_AUTH_TYPE + "=Digest;" +
+                        GatewayConfig.REMOTE_CONFIG_REGISTRY_CREDENTIAL_ALIAS + "=" + alias;
+        EasyMock.expect(gc.getRemoteRegistryConfiguration(configMonitorName))
+                .andReturn(registryConfig).anyTimes();
+        EasyMock.expect(gc.getRemoteConfigurationMonitorClientName()).andReturn(configMonitorName).anyTimes();
+        EasyMock.replay(gc);
+
+        AliasService aliasService = EasyMock.createNiceMock(AliasService.class);
+        EasyMock.expect(aliasService.getPasswordFromAliasForGateway(alias))
+                .andReturn(ZK_PASSWORD.toCharArray())
+                .anyTimes();
+        EasyMock.replay(aliasService);
+
+        RemoteConfigurationRegistryClientService clientService = (new ZooKeeperClientServiceProvider()).newInstance();
+        clientService.setAliasService(aliasService);
+        clientService.init(gc, Collections.emptyMap());
+        clientService.start();
+
+        RemoteConfigurationMonitorFactory.setClientService(clientService);
+
+        RemoteConfigurationMonitor cm = RemoteConfigurationMonitorFactory.get(gc);
+        assertNotNull("Failed to load RemoteConfigurationMonitor", cm);
+
+        final ACL ANY_AUTHENTICATED_USER_ALL = new ACL(ZooDefs.Perms.ALL, new Id("auth", ""));
+        List<ACL> acls = Arrays.asList(ANY_AUTHENTICATED_USER_ALL, new ACL(ZooDefs.Perms.WRITE, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_KNOX);
+        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_KNOX_CONFIG);
+        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_KNOX_PROVIDERS);
+        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_KNOX_DESCRIPTORS);
+
+        // Make sure both ACLs were applied
+        List<ACL> preACLs = client.getACL().forPath(PATH_KNOX);
+        assertEquals(2, preACLs.size());
+
+        // Check that the config nodes really do exist (the monitor will NOT create them if they're present)
+        assertNotNull(client.checkExists().forPath(PATH_KNOX));
+        assertNotNull(client.checkExists().forPath(PATH_KNOX_CONFIG));
+        assertNotNull(client.checkExists().forPath(PATH_KNOX_PROVIDERS));
+        assertNotNull(client.checkExists().forPath(PATH_KNOX_DESCRIPTORS));
+
+        try {
+            cm.start();
+        } catch (Exception e) {
+            fail("Failed to start monitor: " + e.getMessage());
         }
 
-        // Shutdown the ZK cluster
-        zkCluster.close();
+        // Validate the expected ACLs on the Knox config znodes (make sure the monitor removed the world:anyone ACL)
+        List<ACL> expectedACLs = Collections.singletonList(SASL_TESTUSER_ALL);
+        validateKnoxConfigNodeACLs(expectedACLs, client.getACL().forPath(PATH_KNOX));
+        validateKnoxConfigNodeACLs(expectedACLs, client.getACL().forPath(PATH_KNOX_CONFIG));
+        validateKnoxConfigNodeACLs(expectedACLs, client.getACL().forPath(PATH_KNOX_PROVIDERS));
+        validateKnoxConfigNodeACLs(expectedACLs, client.getACL().forPath(PATH_KNOX_DESCRIPTORS));
+    }
 
-        // Delete the working dir
-        testTmp.delete();
+
+    @Test
+    public void testZooKeeperConfigMonitorSASLNodesExistWithAcceptableACL() throws Exception {
+        final String configMonitorName = "zkConfigClient";
+        final String alias = "zkPass";
+
+        // Setup the base GatewayConfig mock
+        GatewayConfig gc = EasyMock.createNiceMock(GatewayConfig.class);
+        EasyMock.expect(gc.getGatewayProvidersConfigDir()).andReturn(providersDir.getAbsolutePath()).anyTimes();
+        EasyMock.expect(gc.getGatewayDescriptorsDir()).andReturn(descriptorsDir.getAbsolutePath()).anyTimes();
+        EasyMock.expect(gc.getRemoteRegistryConfigurationNames())
+                .andReturn(Collections.singletonList(configMonitorName))
+                .anyTimes();
+        final String registryConfig =
+                GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE + "=" + ZooKeeperClientService.TYPE + ";" +
+                        GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS + "=" + zkCluster.getConnectString() + ";" +
+                        GatewayConfig.REMOTE_CONFIG_REGISTRY_PRINCIPAL + "=" + ZK_USERNAME + ";" +
+                        GatewayConfig.REMOTE_CONFIG_REGISTRY_AUTH_TYPE + "=Digest;" +
+                        GatewayConfig.REMOTE_CONFIG_REGISTRY_CREDENTIAL_ALIAS + "=" + alias;
+        EasyMock.expect(gc.getRemoteRegistryConfiguration(configMonitorName))
+                .andReturn(registryConfig).anyTimes();
+        EasyMock.expect(gc.getRemoteConfigurationMonitorClientName()).andReturn(configMonitorName).anyTimes();
+        EasyMock.replay(gc);
+
+        AliasService aliasService = EasyMock.createNiceMock(AliasService.class);
+        EasyMock.expect(aliasService.getPasswordFromAliasForGateway(alias))
+                .andReturn(ZK_PASSWORD.toCharArray())
+                .anyTimes();
+        EasyMock.replay(aliasService);
+
+        RemoteConfigurationRegistryClientService clientService = (new ZooKeeperClientServiceProvider()).newInstance();
+        clientService.setAliasService(aliasService);
+        clientService.init(gc, Collections.emptyMap());
+        clientService.start();
+
+        RemoteConfigurationMonitorFactory.setClientService(clientService);
+
+        RemoteConfigurationMonitor cm = RemoteConfigurationMonitorFactory.get(gc);
+        assertNotNull("Failed to load RemoteConfigurationMonitor", cm);
+
+        List<ACL> acls = Arrays.asList(ANY_AUTHENTICATED_USER_ALL);
+        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_KNOX);
+        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_KNOX_CONFIG);
+        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_KNOX_PROVIDERS);
+        client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).withACL(acls).forPath(PATH_KNOX_DESCRIPTORS);
+
+        // Check that the config nodes really do exist (the monitor will NOT create them if they're present)
+        assertNotNull(client.checkExists().forPath(PATH_KNOX));
+        assertNotNull(client.checkExists().forPath(PATH_KNOX_CONFIG));
+        assertNotNull(client.checkExists().forPath(PATH_KNOX_PROVIDERS));
+        assertNotNull(client.checkExists().forPath(PATH_KNOX_DESCRIPTORS));
+
+        try {
+            cm.start();
+        } catch (Exception e) {
+            fail("Failed to start monitor: " + e.getMessage());
+        }
+
+        // Test auth violation
+        clientService.get(configMonitorName).createEntry("/auth_test/child_node/test1");
+        assertNull("Creation should have been prevented since write access is not granted to the test client.",
+                client.checkExists().forPath("/auth_test/child_node/test1"));
+        assertTrue("Creation should have been prevented since write access is not granted to the test client.",
+                client.getChildren().forPath("/auth_test/child_node").isEmpty());
+
+        // Validate the expected ACLs on the Knox config znodes (make sure the monitor didn't change them)
+        List<ACL> expectedACLs = Collections.singletonList(SASL_TESTUSER_ALL);
+        validateKnoxConfigNodeACLs(expectedACLs, client.getACL().forPath(PATH_KNOX));
+        validateKnoxConfigNodeACLs(expectedACLs, client.getACL().forPath(PATH_KNOX_CONFIG));
+        validateKnoxConfigNodeACLs(expectedACLs, client.getACL().forPath(PATH_KNOX_PROVIDERS));
+        validateKnoxConfigNodeACLs(expectedACLs, client.getACL().forPath(PATH_KNOX_DESCRIPTORS));
     }
 
+
     @Test
-    public void testZooKeeperConfigMonitorSASL() throws Exception {
+    public void testZooKeeperConfigMonitorSASLCreateNodes() throws Exception {
         final String configMonitorName = "zkConfigClient";
+        final String alias = "zkPass";
 
         // Setup the base GatewayConfig mock
         GatewayConfig gc = EasyMock.createNiceMock(GatewayConfig.class);
@@ -187,13 +366,19 @@ public class RemoteConfigurationMonitorTest {
                 .anyTimes();
         final String registryConfig =
                             GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE + "=" + ZooKeeperClientService.TYPE + ";" +
-                            GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS + "=" + zkCluster.getConnectString();
+                            GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS + "=" + zkCluster.getConnectString() + ";" +
+                            GatewayConfig.REMOTE_CONFIG_REGISTRY_PRINCIPAL + "=" + ZK_USERNAME + ";" +
+                            GatewayConfig.REMOTE_CONFIG_REGISTRY_AUTH_TYPE + "=Digest;" +
+                            GatewayConfig.REMOTE_CONFIG_REGISTRY_CREDENTIAL_ALIAS + "=" + alias;
         EasyMock.expect(gc.getRemoteRegistryConfiguration(configMonitorName))
                 .andReturn(registryConfig).anyTimes();
         EasyMock.expect(gc.getRemoteConfigurationMonitorClientName()).andReturn(configMonitorName).anyTimes();
         EasyMock.replay(gc);
 
         AliasService aliasService = EasyMock.createNiceMock(AliasService.class);
+        EasyMock.expect(aliasService.getPasswordFromAliasForGateway(alias))
+                .andReturn(ZK_PASSWORD.toCharArray())
+                .anyTimes();
         EasyMock.replay(aliasService);
 
         RemoteConfigurationRegistryClientService clientService = (new ZooKeeperClientServiceProvider()).newInstance();
@@ -206,12 +391,33 @@ public class RemoteConfigurationMonitorTest {
         RemoteConfigurationMonitor cm = RemoteConfigurationMonitorFactory.get(gc);
         assertNotNull("Failed to load RemoteConfigurationMonitor", cm);
 
+        // Check that the config nodes really don't yet exist (the monitor will create them if they're not present)
+        assertNull(client.checkExists().forPath(PATH_KNOX));
+        assertNull(client.checkExists().forPath(PATH_KNOX_CONFIG));
+        assertNull(client.checkExists().forPath(PATH_KNOX_PROVIDERS));
+        assertNull(client.checkExists().forPath(PATH_KNOX_DESCRIPTORS));
+
         try {
             cm.start();
         } catch (Exception e) {
             fail("Failed to start monitor: " + e.getMessage());
         }
 
+        // Test auth violation
+        clientService.get(configMonitorName).createEntry("/auth_test/child_node/test1");
+        assertNull("Creation should have been prevented since write access is not granted to the test client.",
+                   client.checkExists().forPath("/auth_test/child_node/test1"));
+        assertTrue("Creation should have been prevented since write access is not granted to the test client.",
+                   client.getChildren().forPath("/auth_test/child_node").isEmpty());
+
+        // Validate the expected ACLs on the Knox config znodes (make sure the monitor created them correctly)
+        List<ACL> expectedACLs = Collections.singletonList(SASL_TESTUSER_ALL);
+        validateKnoxConfigNodeACLs(expectedACLs, client.getACL().forPath(PATH_KNOX));
+        validateKnoxConfigNodeACLs(expectedACLs, client.getACL().forPath(PATH_KNOX_CONFIG));
+        validateKnoxConfigNodeACLs(expectedACLs, client.getACL().forPath(PATH_KNOX_PROVIDERS));
+        validateKnoxConfigNodeACLs(expectedACLs, client.getACL().forPath(PATH_KNOX_DESCRIPTORS));
+
+        // Test the Knox config nodes, for which authentication should be sufficient for access
         try {
             final String pc_one_znode = getProviderPath("providers-config1.xml");
             final File pc_one         = new File(providersDir, "providers-config1.xml");