You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@knox.apache.org by lm...@apache.org on 2017/11/27 17:46:49 UTC

[2/3] knox git commit: KNOX-1107 - Remote Configuration Registry Client Service (Phil Zampino via lmccay)

http://git-wip-us.apache.org/repos/asf/knox/blob/5af2413c/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistriesAccessor.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistriesAccessor.java b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistriesAccessor.java
new file mode 100644
index 0000000..9fed589
--- /dev/null
+++ b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistriesAccessor.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.service.config.remote.config;
+
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegistryConfig;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class RemoteConfigurationRegistriesAccessor {
+
+    // System property for specifying a reference to an XML configuration external to the gateway config
+    private static final String XML_CONFIG_REFERENCE_SYSTEM_PROPERTY_NAME =
+                                                                "org.apache.knox.gateway.remote.registry.config.file";
+
+
+    public static List<RemoteConfigurationRegistryConfig> getRemoteRegistryConfigurations(GatewayConfig gatewayConfig) {
+        List<RemoteConfigurationRegistryConfig> result = new ArrayList<>();
+
+        boolean useReferencedFile = false;
+
+        // First check for the system property pointing to a valid XML config for the remote registries
+        String remoteConfigRegistryConfigFilename = System.getProperty(XML_CONFIG_REFERENCE_SYSTEM_PROPERTY_NAME);
+        if (remoteConfigRegistryConfigFilename != null) {
+            File remoteConfigRegistryConfigFile = new File(remoteConfigRegistryConfigFilename);
+            if (remoteConfigRegistryConfigFile.exists()) {
+                useReferencedFile = true;
+                // Parse the file, and build the registry config set
+                result.addAll(RemoteConfigurationRegistriesParser.getConfig(remoteConfigRegistryConfigFilename));
+            }
+        }
+
+        // If the system property was not set to a valid reference to another config file, then try to derive the
+        // registry configurations from the gateway config.
+        if (!useReferencedFile) {
+            RemoteConfigurationRegistries remoteConfigRegistries =
+                                                            new DefaultRemoteConfigurationRegistries(gatewayConfig);
+            result.addAll(remoteConfigRegistries.getRegistryConfigurations());
+        }
+
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/5af2413c/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistriesParser.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistriesParser.java b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistriesParser.java
new file mode 100644
index 0000000..3ea71ef
--- /dev/null
+++ b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistriesParser.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.service.config.remote.config;
+
+import org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegistryConfig;
+
+import javax.xml.bind.JAXBContext;
+import javax.xml.bind.JAXBException;
+import javax.xml.bind.Unmarshaller;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+class RemoteConfigurationRegistriesParser {
+
+    static List<RemoteConfigurationRegistryConfig> getConfig(String configFilename) {
+        List<RemoteConfigurationRegistryConfig> result = new ArrayList<>();
+
+        File file = new File(configFilename);
+
+        try {
+            JAXBContext jaxbContext = JAXBContext.newInstance(RemoteConfigurationRegistries.class);
+            Unmarshaller jaxbUnmarshaller = jaxbContext.createUnmarshaller();
+            RemoteConfigurationRegistries parsedContent = (RemoteConfigurationRegistries) jaxbUnmarshaller.unmarshal(file);
+            if (parsedContent != null) {
+                result.addAll(parsedContent.getRegistryConfigurations());
+            }
+        } catch (JAXBException e) {
+            e.printStackTrace();
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/5af2413c/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistry.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistry.java b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistry.java
new file mode 100644
index 0000000..f3e7dbd
--- /dev/null
+++ b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistry.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.service.config.remote.config;
+
+import org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegistryConfig;
+
+import javax.xml.bind.annotation.XmlElement;
+
+class RemoteConfigurationRegistry implements RemoteConfigurationRegistryConfig {
+
+    private String name;
+    private String type;
+    private String connectionString;
+    private String namespace;
+    private String authType;
+    private String principal;
+    private String credentialAlias;
+    private String keyTab;
+    private boolean useKeyTab;
+    private boolean useTicketCache;
+
+    RemoteConfigurationRegistry() {
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public void setRegistryType(String type) {
+        this.type = type;
+    }
+
+    public void setConnectionString(String connectionString) {
+        this.connectionString = connectionString;
+    }
+
+    public void setNamespace(String namespace) {
+        this.namespace = namespace;
+    }
+
+    public void setAuthType(String authType) {
+        this.authType = authType;
+    }
+
+    public void setPrincipal(String principal) {
+        this.principal = principal;
+    }
+
+    public void setCredentialAlias(String alias) {
+        this.credentialAlias = alias;
+    }
+
+    public void setUseTicketCache(boolean useTicketCache) {
+        this.useTicketCache = useTicketCache;
+    }
+
+    public void setUseKeytab(boolean useKeytab) {
+        this.useKeyTab = useKeytab;
+    }
+
+    public void setKeytab(String keytab) {
+        this.keyTab = keytab;
+    }
+
+    @XmlElement(name="name")
+    public String getName() {
+        return name;
+    }
+
+    @XmlElement(name="type")
+    public String getRegistryType() {
+        return type;
+    }
+
+    @XmlElement(name="auth-type")
+    public String getAuthType() {
+        return authType;
+    }
+
+    @XmlElement(name="principal")
+    public String getPrincipal() {
+        return principal;
+    }
+
+    @XmlElement(name="credential-alias")
+    public String getCredentialAlias() {
+        return credentialAlias;
+    }
+
+    @Override
+    @XmlElement(name="address")
+    public String getConnectionString() {
+        return connectionString;
+    }
+
+    @Override
+    @XmlElement(name="namespace")
+    public String getNamespace() {
+        return namespace;
+    }
+
+    @Override
+    @XmlElement(name="use-ticket-cache")
+    public boolean isUseTicketCache() {
+        return useTicketCache;
+    }
+
+    @Override
+    @XmlElement(name="use-key-tab")
+    public boolean isUseKeyTab() {
+        return useKeyTab;
+    }
+
+    @Override
+    @XmlElement(name="keytab")
+    public String getKeytab() {
+        return keyTab;
+    }
+
+    @Override
+    public boolean isSecureRegistry() {
+        return (getAuthType() != null);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/5af2413c/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/CuratorClientService.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/CuratorClientService.java b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/CuratorClientService.java
new file mode 100644
index 0000000..0908252
--- /dev/null
+++ b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/CuratorClientService.java
@@ -0,0 +1,423 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.service.config.remote.zk;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.recipes.cache.ChildData;
+import org.apache.curator.framework.recipes.cache.NodeCache;
+import org.apache.curator.framework.recipes.cache.NodeCacheListener;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationMessages;
+import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClient.ChildEntryListener;
+import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClient.EntryListener;
+import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClient;
+import org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegistryConfig;
+import org.apache.hadoop.gateway.service.config.remote.config.RemoteConfigurationRegistriesAccessor;
+import org.apache.hadoop.gateway.services.ServiceLifecycleException;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.data.Stat;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * RemoteConfigurationRegistryClientService implementation that employs the Curator ZooKeeper client framework.
+ */
+class CuratorClientService implements ZooKeeperClientService {
+
+    private static final String LOGIN_CONTEXT_NAME_PROPERTY = ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY;
+
+    private static final String DEFAULT_LOGIN_CONTEXT_NAME = "Client";
+
+    private static final RemoteConfigurationMessages log =
+                                                        MessagesFactory.get(RemoteConfigurationMessages.class);
+
+    private Map<String, RemoteConfigurationRegistryClient> clients = new HashMap<>();
+
+    private AliasService aliasService = null;
+
+
+    @Override
+    public void init(GatewayConfig config, Map<String, String> options) throws ServiceLifecycleException {
+
+        List<RemoteConfigurationRegistryConfig> registryConfigs = new ArrayList<>();
+
+        // Load the remote registry configurations
+        registryConfigs.addAll(RemoteConfigurationRegistriesAccessor.getRemoteRegistryConfigurations(config));
+
+        // Configure registry authentication
+        RemoteConfigurationRegistryJAASConfig.configure(registryConfigs, aliasService);
+
+        if (registryConfigs.size() > 1) {
+            // Warn about current limit on number of supported client configurations
+            log.multipleRemoteRegistryConfigurations();
+        }
+
+        // Create the clients
+        for (RemoteConfigurationRegistryConfig registryConfig : registryConfigs) {
+            if (TYPE.equalsIgnoreCase(registryConfig.getRegistryType())) {
+                RemoteConfigurationRegistryClient registryClient = createClient(registryConfig);
+                clients.put(registryConfig.getName(), registryClient);
+            }
+        }
+    }
+
+    @Override
+    public void setAliasService(AliasService aliasService) {
+        this.aliasService = aliasService;
+    }
+
+    @Override
+    public void start() throws ServiceLifecycleException {
+    }
+
+    @Override
+    public void stop() throws ServiceLifecycleException {
+    }
+
+    @Override
+    public RemoteConfigurationRegistryClient get(String name) {
+        return clients.get(name);
+    }
+
+
+    private RemoteConfigurationRegistryClient createClient(RemoteConfigurationRegistryConfig config) {
+        ACLProvider aclProvider;
+        if (config.isSecureRegistry()) {
+            configureSasl(config);
+            aclProvider = new SASLOwnerACLProvider(config.getPrincipal());
+        } else {
+            // Clear SASL system property
+            System.clearProperty(LOGIN_CONTEXT_NAME_PROPERTY);
+            aclProvider = new DefaultACLProvider();
+        }
+
+        CuratorFramework client = CuratorFrameworkFactory.builder()
+                                                         .connectString(config.getConnectionString())
+                                                         .retryPolicy(new ExponentialBackoffRetry(1000, 3))
+                                                         .aclProvider(aclProvider)
+                                                         .build();
+        client.start();
+
+        return (new ClientAdapter(client, config));
+    }
+
+
+    private void configureSasl(RemoteConfigurationRegistryConfig config) {
+        String registryName = config.getName();
+        if (registryName == null) {
+            registryName = DEFAULT_LOGIN_CONTEXT_NAME;
+        }
+        System.setProperty(LOGIN_CONTEXT_NAME_PROPERTY, registryName);
+    }
+
+
+    private static final class ClientAdapter implements RemoteConfigurationRegistryClient {
+
+        private static final String DEFAULT_ENCODING = "UTF-8";
+
+        private CuratorFramework delegate;
+
+        private RemoteConfigurationRegistryConfig config;
+
+        private Map<String, NodeCache> entryNodeCaches = new HashMap<>();
+
+        ClientAdapter(CuratorFramework delegate, RemoteConfigurationRegistryConfig config) {
+            this.delegate = delegate;
+            this.config = config;
+        }
+
+        @Override
+        public String getAddress() {
+            return config.getConnectionString();
+        }
+
+        @Override
+        public boolean entryExists(String path) {
+            Stat s = null;
+            try {
+                s = delegate.checkExists().forPath(path);
+            } catch (Exception e) {
+                // Ignore
+            }
+            return (s != null);
+        }
+
+        @Override
+        public List<RemoteConfigurationRegistryClient.EntryACL> getACL(String path) {
+            List<RemoteConfigurationRegistryClient.EntryACL> acl = new ArrayList<>();
+            try {
+                List<ACL> zkACL = delegate.getACL().forPath(path);
+                if (zkACL != null) {
+                    for (ACL aclEntry : zkACL) {
+                        RemoteConfigurationRegistryClient.EntryACL entryACL = new ZooKeeperACLAdapter(aclEntry);
+                        acl.add(entryACL);
+                    }
+                }
+            } catch (Exception e) {
+                log.errorHandlingRemoteConfigACL(path, e);
+            }
+            return acl;
+        }
+
+        @Override
+        public List<String> listChildEntries(String path) {
+            List<String> result = null;
+            try {
+                result = delegate.getChildren().forPath(path);
+            } catch (Exception e) {
+                log.errorInteractingWithRemoteConfigRegistry(e);
+            }
+            return result;
+        }
+
+        @Override
+        public void addChildEntryListener(String path, ChildEntryListener listener) throws Exception {
+            PathChildrenCache childCache = new PathChildrenCache(delegate, path, false);
+            childCache.getListenable().addListener(new ChildEntryListenerAdapter(this, listener));
+            childCache.start();
+        }
+
+        @Override
+        public void addEntryListener(String path, EntryListener listener) throws Exception {
+            NodeCache nodeCache = new NodeCache(delegate, path);
+            nodeCache.getListenable().addListener(new EntryListenerAdapter(this, nodeCache, listener));
+            nodeCache.start();
+            entryNodeCaches.put(path, nodeCache);
+        }
+
+        @Override
+        public void removeEntryListener(String path) throws Exception {
+            NodeCache nodeCache = entryNodeCaches.remove(path);
+            if (nodeCache != null) {
+                nodeCache.close();
+            }
+        }
+
+        @Override
+        public String getEntryData(String path) {
+            return getEntryData(path, DEFAULT_ENCODING);
+        }
+
+        @Override
+        public String getEntryData(String path, String encoding) {
+            String result = null;
+            try {
+                byte[] data = delegate.getData().forPath(path);
+                if (data != null) {
+                    result = new String(data, Charset.forName(encoding));
+                }
+            } catch (Exception e) {
+                log.errorInteractingWithRemoteConfigRegistry(e);
+            }
+            return result;
+        }
+
+        @Override
+        public void createEntry(String path) {
+            try {
+                if (delegate.checkExists().forPath(path) == null) {
+                    delegate.create().forPath(path);
+                }
+            } catch (Exception e) {
+                log.errorInteractingWithRemoteConfigRegistry(e);
+            }
+        }
+
+        @Override
+        public void createEntry(String path, String data) {
+            createEntry(path, data, DEFAULT_ENCODING);
+        }
+
+        @Override
+        public void createEntry(String path, String data, String encoding) {
+            try {
+                createEntry(path);
+                setEntryData(path, data, encoding);
+            } catch (Exception e) {
+                log.errorInteractingWithRemoteConfigRegistry(e);
+            }
+        }
+
+        @Override
+        public int setEntryData(String path, String data) {
+            return setEntryData(path, data, DEFAULT_ENCODING);
+        }
+
+        @Override
+        public int setEntryData(String path, String data, String encoding) {
+            int version = 0;
+            try {
+                Stat s = delegate.setData().forPath(path, data.getBytes(Charset.forName(encoding)));
+                if (s != null) {
+                    version = s.getVersion();
+                }
+            } catch (Exception e) {
+                log.errorInteractingWithRemoteConfigRegistry(e);
+            }
+            return version;
+        }
+
+        @Override
+        public void deleteEntry(String path) {
+            try {
+                delegate.delete().forPath(path);
+            } catch (Exception e) {
+                log.errorInteractingWithRemoteConfigRegistry(e);
+            }
+        }
+    }
+
+    /**
+     * SASL ACLProvider
+     */
+    private static class SASLOwnerACLProvider implements ACLProvider {
+
+        private final List<ACL> saslACL;
+
+        private SASLOwnerACLProvider(String principal) {
+            this.saslACL = Collections.singletonList(new ACL(ZooDefs.Perms.ALL, new Id("sasl", principal)));
+        }
+
+        @Override
+        public List<ACL> getDefaultAcl() {
+            return saslACL;
+        }
+
+        @Override
+        public List<ACL> getAclForPath(String path) {
+            return getDefaultAcl();
+        }
+    }
+
+
+    private static final class ChildEntryListenerAdapter implements PathChildrenCacheListener {
+
+        private RemoteConfigurationRegistryClient client;
+        private ChildEntryListener delegate;
+
+        ChildEntryListenerAdapter(RemoteConfigurationRegistryClient client, ChildEntryListener delegate) {
+            this.client = client;
+            this.delegate = delegate;
+        }
+
+        @Override
+        public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent)
+                throws Exception {
+            ChildData childData = pathChildrenCacheEvent.getData();
+            if (childData != null) {
+                delegate.childEvent(client,
+                                    adaptType(pathChildrenCacheEvent.getType()),
+                                    childData.getPath());
+            }
+        }
+
+        private ChildEntryListener.Type adaptType(PathChildrenCacheEvent.Type type) {
+            ChildEntryListener.Type adapted = null;
+
+            switch(type) {
+                case CHILD_ADDED:
+                    adapted = ChildEntryListener.Type.ADDED;
+                    break;
+                case CHILD_REMOVED:
+                    adapted = ChildEntryListener.Type.REMOVED;
+                    break;
+                case CHILD_UPDATED:
+                    adapted = ChildEntryListener.Type.UPDATED;
+                    break;
+            }
+
+            return adapted;
+        }
+    }
+
+    private static final class EntryListenerAdapter implements NodeCacheListener {
+
+        private RemoteConfigurationRegistryClient client;
+        private EntryListener delegate;
+        private NodeCache nodeCache;
+
+        EntryListenerAdapter(RemoteConfigurationRegistryClient client, NodeCache nodeCache, EntryListener delegate) {
+            this.client = client;
+            this.nodeCache = nodeCache;
+            this.delegate = delegate;
+        }
+
+        @Override
+        public void nodeChanged() throws Exception {
+            String path = null;
+            byte[] data = null;
+
+            ChildData cd = nodeCache.getCurrentData();
+            if (cd != null) {
+                path = cd.getPath();
+                data = cd.getData();
+            }
+
+            if (path != null) {
+                delegate.entryChanged(client, path, data);
+            }
+        }
+    }
+
+    /**
+     * ACL adapter
+     */
+    private static final class ZooKeeperACLAdapter implements RemoteConfigurationRegistryClient.EntryACL {
+        private String type;
+        private String id;
+        private Object permissions;
+
+        ZooKeeperACLAdapter(ACL acl) {
+            this.permissions = acl.getPerms();
+            this.type = acl.getId().getScheme();
+            this.id = acl.getId().getId();
+        }
+
+        @Override
+        public String getId() {
+            return id;
+        }
+
+        @Override
+        public String getType() {
+            return type;
+        }
+
+        @Override
+        public Object getPermissions() {
+            return permissions;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/5af2413c/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/RemoteConfigurationRegistryJAASConfig.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/RemoteConfigurationRegistryJAASConfig.java b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/RemoteConfigurationRegistryJAASConfig.java
new file mode 100644
index 0000000..d51d7d5
--- /dev/null
+++ b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/RemoteConfigurationRegistryJAASConfig.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.service.config.remote.zk;
+
+import org.apache.hadoop.gateway.i18n.messages.MessagesFactory;
+import org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationMessages;
+import org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegistryConfig;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.hadoop.gateway.services.security.AliasServiceException;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Configuration decorator that adds SASL JAAS configuration to whatever JAAS config is already applied.
+ */
+class RemoteConfigurationRegistryJAASConfig extends Configuration {
+
+    // Underlying SASL mechanisms supported
+    enum SASLMechanism {
+        Kerberos,
+        Digest
+    }
+
+    static final Map<String, String> digestLoginModules = new HashMap<>();
+    static {
+        digestLoginModules.put("ZOOKEEPER", "org.apache.zookeeper.server.auth.DigestLoginModule");
+    }
+
+    private static final RemoteConfigurationMessages log = MessagesFactory.get(RemoteConfigurationMessages.class);
+
+    // Cache the current JAAS configuration
+    private Configuration delegate = Configuration.getConfiguration();
+
+    private AliasService aliasService;
+
+    private Map<String, AppConfigurationEntry[]> contextEntries =  new HashMap<>();
+
+    static RemoteConfigurationRegistryJAASConfig configure(List<RemoteConfigurationRegistryConfig> configs, AliasService aliasService) {
+        return new RemoteConfigurationRegistryJAASConfig(configs, aliasService);
+    }
+
+    private RemoteConfigurationRegistryJAASConfig(List<RemoteConfigurationRegistryConfig> configs, AliasService aliasService) {
+        this.aliasService = aliasService;
+
+        // Populate context entries
+        List<AppConfigurationEntry> appConfigEntries = new ArrayList<>();
+        for (RemoteConfigurationRegistryConfig config : configs) {
+            if (config.isSecureRegistry()) {
+                contextEntries.put(config.getName(), createEntries(config));
+            }
+        }
+
+        // If there is at least one context entry, then set this as the client configuration
+        if (!contextEntries.isEmpty()) {
+            // TODO: PJZ: ZooKeeper 3.6.0 will have per-client JAAS Configuration support; Upgrade ASAP!!
+            // For now, set this as the static JAAS configuration
+            Configuration.setConfiguration(this);
+        }
+    }
+
+    @Override
+    public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+        AppConfigurationEntry[] result = null;
+
+        // First, try the delegate's context entries
+        result = delegate.getAppConfigurationEntry(name);
+        if (result == null || result.length < 1) {
+            // Try our additional context entries
+            result = contextEntries.get(name);
+        }
+
+        return result;
+    }
+
+    private AppConfigurationEntry[] createEntries(RemoteConfigurationRegistryConfig config) {
+        // Only supporting a single app config entry per configuration/context
+        AppConfigurationEntry[] result = new AppConfigurationEntry[1];
+        result[0] = createEntry(config);
+        return result;
+    }
+
+    private AppConfigurationEntry createEntry(RemoteConfigurationRegistryConfig config) {
+        AppConfigurationEntry entry = null;
+
+        Map<String, String> opts = new HashMap<>();
+        SASLMechanism saslMechanism = getSASLMechanism(config.getAuthType());
+        switch (saslMechanism) {
+            case Digest:
+                // Digest auth options
+                opts.put("username", config.getPrincipal());
+
+                char[] credential = null;
+                if (aliasService != null) {
+                    try {
+                        credential = aliasService.getPasswordFromAliasForGateway(config.getCredentialAlias());
+                    } catch (AliasServiceException e) {
+                        log.unresolvedCredentialAlias(config.getCredentialAlias());
+                    }
+                } else {
+                    throw new IllegalArgumentException("The AliasService is required to resolve credential aliases.");
+                }
+
+                if (credential != null) {
+                    opts.put("password", new String(credential));
+                }
+                break;
+            case Kerberos:
+                opts.put("isUseTicketCache", String.valueOf(config.isUseTicketCache()));
+                opts.put("isUseKeyTab", String.valueOf(config.isUseKeyTab()));
+                opts.put("keyTab", config.getKeytab());
+                opts.put("principal", config.getPrincipal());
+        }
+
+        entry = new AppConfigurationEntry(getLoginModuleName(config.getRegistryType(), saslMechanism),
+                                          AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                                          opts);
+
+        return entry;
+    }
+
+    private static String getLoginModuleName(String registryType, SASLMechanism saslMechanism) {
+        String loginModuleName = null;
+
+        switch (saslMechanism) {
+            case Kerberos:
+                if (System.getProperty("java.vendor").contains("IBM")) {
+                    loginModuleName = "com.ibm.security.auth.module.Krb5LoginModule";
+                } else {
+                    loginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+                }
+                break;
+            case Digest:
+                loginModuleName = digestLoginModules.get(registryType.toUpperCase());
+        }
+        return loginModuleName;
+    }
+
+    private static SASLMechanism getSASLMechanism(String authType) {
+        SASLMechanism result = null;
+        for (SASLMechanism at : SASLMechanism.values()) {
+            if (at.name().equalsIgnoreCase(authType)) {
+                result = at;
+                break;
+            }
+        }
+        return result;
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/5af2413c/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/ZooKeeperClientService.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/ZooKeeperClientService.java b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/ZooKeeperClientService.java
new file mode 100644
index 0000000..c4add4a
--- /dev/null
+++ b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/ZooKeeperClientService.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.service.config.remote.zk;
+
+import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClientService;
+
+public interface ZooKeeperClientService extends RemoteConfigurationRegistryClientService {
+
+    String TYPE = "ZooKeeper";
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/5af2413c/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/ZooKeeperClientServiceProvider.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/ZooKeeperClientServiceProvider.java b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/ZooKeeperClientServiceProvider.java
new file mode 100644
index 0000000..f30d3da
--- /dev/null
+++ b/gateway-service-remoteconfig/src/main/java/org/apache/hadoop/gateway/service/config/remote/zk/ZooKeeperClientServiceProvider.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.service.config.remote.zk;
+
+import org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegistryClientServiceProvider;
+
+
+public class ZooKeeperClientServiceProvider implements RemoteConfigurationRegistryClientServiceProvider {
+
+    @Override
+    public String getType() {
+        return ZooKeeperClientService.TYPE;
+    }
+
+    @Override
+    public ZooKeeperClientService newInstance() {
+        return new CuratorClientService();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/5af2413c/gateway-service-remoteconfig/src/main/resources/META-INF/services/org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegistryClientServiceProvider
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/main/resources/META-INF/services/org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegistryClientServiceProvider b/gateway-service-remoteconfig/src/main/resources/META-INF/services/org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegistryClientServiceProvider
new file mode 100644
index 0000000..7f2312a
--- /dev/null
+++ b/gateway-service-remoteconfig/src/main/resources/META-INF/services/org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegistryClientServiceProvider
@@ -0,0 +1,19 @@
+##########################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+##########################################################################
+
+org.apache.hadoop.gateway.service.config.remote.zk.ZooKeeperClientServiceProvider

http://git-wip-us.apache.org/repos/asf/knox/blob/5af2413c/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/config/DefaultRemoteConfigurationRegistriesTest.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/config/DefaultRemoteConfigurationRegistriesTest.java b/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/config/DefaultRemoteConfigurationRegistriesTest.java
new file mode 100644
index 0000000..a33fcc2
--- /dev/null
+++ b/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/config/DefaultRemoteConfigurationRegistriesTest.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.service.config.remote.config;
+
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.junit.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+public class DefaultRemoteConfigurationRegistriesTest {
+
+    /**
+     * Test a single registry configuration with digest auth configuration.
+     */
+    @Test
+    public void testPropertiesRemoteConfigurationRegistriesSingleDigest() throws Exception {
+        Map<String, Properties> testProperties = new HashMap<>();
+        Properties p = new Properties();
+        p.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE, "ZooKeeper");
+        p.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS, "hostx:2181");
+        p.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_PRINCIPAL, "zkDigestUser");
+        p.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_AUTH_TYPE, "digest");
+        p.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_CREDENTIAL_ALIAS, "zkDigestAlias");
+        testProperties.put("testDigest", p);
+
+        doTestPropertiesRemoteConfigurationRegistries(testProperties);
+    }
+
+
+    /**
+     * Test a single registry configuration with kerberos auth configuration.
+     */
+    @Test
+    public void testPropertiesRemoteConfigurationRegistriesSingleKerberos() throws Exception {
+        Map<String, Properties> testProperties = new HashMap<>();
+        Properties p = new Properties();
+        p.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE, "ZooKeeper");
+        p.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS, "hostx:2181");
+        p.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_PRINCIPAL, "zkUser");
+        p.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_AUTH_TYPE, "kerberos");
+        p.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_KEYTAB, "/home/user/remoteregistry.keytab");
+        p.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_USE_KEYTAB, "true");
+        p.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_USE_TICKET_CACHE, "false");
+        testProperties.put("testKerb", p);
+
+        doTestPropertiesRemoteConfigurationRegistries(testProperties);
+    }
+
+    /**
+     * Test multiple registry configuration with varying auth configurations.
+     */
+    @Test
+    public void testPropertiesRemoteConfigurationRegistriesMultipleMixed() throws Exception {
+        Map<String, Properties> testProperties = new HashMap<>();
+
+        Properties kerb = new Properties();
+        kerb.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE, "ZooKeeper");
+        kerb.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS, "host1:2181");
+        kerb.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_NAMESPACE, "/knox/config");
+        kerb.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_PRINCIPAL, "kerbPrincipal");
+        kerb.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_AUTH_TYPE, "kerberos");
+        kerb.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_KEYTAB, "/home/user/mykrb.keytab");
+        kerb.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_USE_KEYTAB, "true");
+        kerb.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_USE_TICKET_CACHE, "false");
+        testProperties.put("testKerb1", kerb);
+
+        Properties digest = new Properties();
+        digest.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE, "ZooKeeper");
+        digest.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS, "host2:2181");
+        digest.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_PRINCIPAL, "digestPrincipal");
+        digest.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_AUTH_TYPE, "digest");
+        digest.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_CREDENTIAL_ALIAS, "digestPwdAlias");
+        testProperties.put("testDigest1", digest);
+
+        Properties unsecured = new Properties();
+        unsecured.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE, "ZooKeeper");
+        unsecured.setProperty(GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS, "host2:2181");
+        testProperties.put("testUnsecured", unsecured);
+
+        doTestPropertiesRemoteConfigurationRegistries(testProperties);
+    }
+
+
+    /**
+     * Perform the actual test.
+     *
+     * @param testProperties The test properties
+     */
+    private void doTestPropertiesRemoteConfigurationRegistries(Map<String, Properties> testProperties) throws Exception {
+        // Mock gateway config
+        GatewayConfig gc = mockGatewayConfig(testProperties);
+
+        // Create the RemoteConfigurationRegistries object to be tested from the GatewayConfig
+        RemoteConfigurationRegistries registries = new DefaultRemoteConfigurationRegistries(gc);
+
+        // Basic validation
+        assertNotNull(registries);
+        List<RemoteConfigurationRegistry> registryConfigs = registries.getRegistryConfigurations();
+        assertNotNull(registryConfigs);
+        assertEquals(testProperties.size(), registryConfigs.size());
+
+        // Validate the contents of the created object
+        for (RemoteConfigurationRegistry regConfig : registryConfigs) {
+            validateRemoteRegistryConfig(regConfig.getName(), testProperties.get(regConfig.getName()), regConfig);
+        }
+    }
+
+
+    /**
+     * Create a mock GatewayConfig based on the specified test properties.
+     *
+     * @param testProperties The test properties to set on the config
+     */
+    private GatewayConfig mockGatewayConfig(Map<String, Properties> testProperties) {
+        // Mock gateway config
+        GatewayConfig gc = EasyMock.createNiceMock(GatewayConfig.class);
+        List<String> configNames = new ArrayList<>();
+        for (String registryName : testProperties.keySet()) {
+            configNames.add(registryName);
+
+            String propertyValueString = "";
+            Properties props = testProperties.get(registryName);
+            Enumeration names = props.propertyNames();
+            while (names.hasMoreElements()) {
+                String propertyName = (String) names.nextElement();
+                propertyValueString += propertyName + "=" + props.get(propertyName);
+                if (names.hasMoreElements()) {
+                    propertyValueString += ";";
+                }
+            }
+            EasyMock.expect(gc.getRemoteRegistryConfiguration(registryName))
+                    .andReturn(propertyValueString)
+                    .anyTimes();
+        }
+        EasyMock.expect(gc.getRemoteRegistryConfigurationNames()).andReturn(configNames).anyTimes();
+        EasyMock.replay(gc);
+
+        return gc;
+    }
+
+
+    /**
+     * Validate the specified RemoteConfigurationRegistry based on the expected test properties.
+     */
+    private void validateRemoteRegistryConfig(String                      configName,
+                                              Properties                  expected,
+                                              RemoteConfigurationRegistry registryConfig) throws Exception {
+        assertEquals(configName, registryConfig.getName());
+        assertEquals(expected.get(GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE), registryConfig.getRegistryType());
+        assertEquals(expected.get(GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS), registryConfig.getConnectionString());
+        assertEquals(expected.get(GatewayConfig.REMOTE_CONFIG_REGISTRY_NAMESPACE), registryConfig.getNamespace());
+        assertEquals(registryConfig.isSecureRegistry(), expected.get(GatewayConfig.REMOTE_CONFIG_REGISTRY_AUTH_TYPE) != null);
+        assertEquals(expected.get(GatewayConfig.REMOTE_CONFIG_REGISTRY_AUTH_TYPE), registryConfig.getAuthType());
+        assertEquals(expected.get(GatewayConfig.REMOTE_CONFIG_REGISTRY_PRINCIPAL), registryConfig.getPrincipal());
+        assertEquals(expected.get(GatewayConfig.REMOTE_CONFIG_REGISTRY_CREDENTIAL_ALIAS), registryConfig.getCredentialAlias());
+        assertEquals(expected.get(GatewayConfig.REMOTE_CONFIG_REGISTRY_KEYTAB), registryConfig.getKeytab());
+        assertEquals(Boolean.valueOf((String)expected.get(GatewayConfig.REMOTE_CONFIG_REGISTRY_USE_KEYTAB)), registryConfig.isUseKeyTab());
+        assertEquals(Boolean.valueOf((String)expected.get(GatewayConfig.REMOTE_CONFIG_REGISTRY_USE_TICKET_CACHE)), registryConfig.isUseTicketCache());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/5af2413c/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistryConfigParserTest.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistryConfigParserTest.java b/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistryConfigParserTest.java
new file mode 100644
index 0000000..386e332
--- /dev/null
+++ b/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/config/RemoteConfigurationRegistryConfigParserTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.service.config.remote.config;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegistryConfig;
+import org.apache.hadoop.gateway.service.config.remote.util.RemoteRegistryConfigTestUtils;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import static org.apache.hadoop.gateway.service.config.remote.util.RemoteRegistryConfigTestUtils.*;
+
+public class RemoteConfigurationRegistryConfigParserTest {
+
+    @Test
+    public void testExternalXMLParsing() throws Exception {
+        final String CONN_STR = "http://my.zookeeper.host:2181";
+
+        Map<String, Map<String, String>> testRegistryConfigurations = new HashMap<>();
+
+        Map<String, String> config1 = new HashMap<>();
+        config1.put(PROPERTY_TYPE, "ZooKeeper");
+        config1.put(PROPERTY_NAME, "registry1");
+        config1.put(PROPERTY_ADDRESS, CONN_STR);
+        config1.put(PROPERTY_SECURE, "true");
+        config1.put(PROPERTY_AUTH_TYPE, "Digest");
+        config1.put(PROPERTY_PRINCIPAL, "knox");
+        config1.put(PROPERTY_CRED_ALIAS, "zkCredential");
+        testRegistryConfigurations.put(config1.get("name"), config1);
+
+        Map<String, String> config2 = new HashMap<>();
+        config2.put(PROPERTY_TYPE, "ZooKeeper");
+        config2.put(PROPERTY_NAME, "MyKerberos");
+        config2.put(PROPERTY_ADDRESS, CONN_STR);
+        config2.put(PROPERTY_SECURE, "true");
+        config2.put(PROPERTY_AUTH_TYPE, "Kerberos");
+        config2.put(PROPERTY_PRINCIPAL, "knox");
+        File myKeyTab = File.createTempFile("mytest", "keytab");
+        config2.put(PROPERTY_KEYTAB, myKeyTab.getAbsolutePath());
+        config2.put(PROPERTY_USE_KEYTAB, "false");
+        config2.put(PROPERTY_USE_TICKET_CACHE, "true");
+        testRegistryConfigurations.put(config2.get("name"), config2);
+
+        Map<String, String> config3 = new HashMap<>();
+        config3.put(PROPERTY_TYPE, "ZooKeeper");
+        config3.put(PROPERTY_NAME, "anotherRegistry");
+        config3.put(PROPERTY_ADDRESS, "whatever:1281");
+        testRegistryConfigurations.put(config3.get("name"), config3);
+
+        String configXML =
+                    RemoteRegistryConfigTestUtils.createRemoteConfigRegistriesXML(testRegistryConfigurations.values());
+
+        File registryConfigFile = File.createTempFile("remote-registries", "xml");
+        try {
+            FileUtils.writeStringToFile(registryConfigFile, configXML);
+
+            List<RemoteConfigurationRegistryConfig> configs =
+                                    RemoteConfigurationRegistriesParser.getConfig(registryConfigFile.getAbsolutePath());
+            assertNotNull(configs);
+            assertEquals(testRegistryConfigurations.keySet().size(), configs.size());
+
+            for (RemoteConfigurationRegistryConfig registryConfig : configs) {
+                Map<String, String> expected = testRegistryConfigurations.get(registryConfig.getName());
+                assertNotNull(expected);
+                validateParsedRegistryConfiguration(registryConfig, expected);
+            }
+        } finally {
+            registryConfigFile.delete();
+        }
+    }
+
+    private void validateParsedRegistryConfiguration(RemoteConfigurationRegistryConfig config,
+                                                     Map<String, String> expected) throws Exception {
+        assertEquals(expected.get(PROPERTY_TYPE), config.getRegistryType());
+        assertEquals(expected.get(PROPERTY_ADDRESS), config.getConnectionString());
+        assertEquals(expected.get(PROPERTY_NAME), config.getName());
+        assertEquals(expected.get(PROPERTY_NAMESAPCE), config.getNamespace());
+        assertEquals(Boolean.valueOf(expected.get(PROPERTY_SECURE)), config.isSecureRegistry());
+        assertEquals(expected.get(PROPERTY_AUTH_TYPE), config.getAuthType());
+        assertEquals(expected.get(PROPERTY_PRINCIPAL), config.getPrincipal());
+        assertEquals(expected.get(PROPERTY_CRED_ALIAS), config.getCredentialAlias());
+        assertEquals(expected.get(PROPERTY_KEYTAB), config.getKeytab());
+        assertEquals(Boolean.valueOf(expected.get(PROPERTY_USE_KEYTAB)), config.isUseKeyTab());
+        assertEquals(Boolean.valueOf(expected.get(PROPERTY_USE_TICKET_CACHE)), config.isUseTicketCache());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/5af2413c/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/util/RemoteRegistryConfigTestUtils.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/util/RemoteRegistryConfigTestUtils.java b/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/util/RemoteRegistryConfigTestUtils.java
new file mode 100644
index 0000000..35919d0
--- /dev/null
+++ b/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/util/RemoteRegistryConfigTestUtils.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.service.config.remote.util;
+
+import java.util.Collection;
+import java.util.Map;
+
+public class RemoteRegistryConfigTestUtils {
+
+    public static final String PROPERTY_TYPE = "type";
+    public static final String PROPERTY_NAME = "name";
+    public static final String PROPERTY_ADDRESS = "address";
+    public static final String PROPERTY_NAMESAPCE = "namespace";
+    public static final String PROPERTY_SECURE = "secure";
+    public static final String PROPERTY_AUTH_TYPE = "authType";
+    public static final String PROPERTY_PRINCIPAL = "principal";
+    public static final String PROPERTY_CRED_ALIAS = "credentialAlias";
+    public static final String PROPERTY_KEYTAB = "keyTab";
+    public static final String PROPERTY_USE_KEYTAB = "useKeyTab";
+    public static final String PROPERTY_USE_TICKET_CACHE = "useTicketCache";
+
+    public static String createRemoteConfigRegistriesXML(Collection<Map<String, String>> configProperties) {
+        String result = "<?xml version=\"1.0\" encoding=\"utf-8\"?>\n" +
+                        "<remote-configuration-registries>\n";
+
+        for (Map<String, String> props : configProperties) {
+            String authType = props.get(PROPERTY_AUTH_TYPE);
+            if ("Kerberos".equalsIgnoreCase(authType)) {
+                result +=
+                   createRemoteConfigRegistryXMLWithKerberosAuth(props.get(PROPERTY_TYPE),
+                                                                 props.get(PROPERTY_NAME),
+                                                                 props.get(PROPERTY_ADDRESS),
+                                                                 props.get(PROPERTY_PRINCIPAL),
+                                                                 props.get(PROPERTY_KEYTAB),
+                                                                 Boolean.valueOf(props.get(PROPERTY_USE_KEYTAB)),
+                                                                 Boolean.valueOf(props.get(PROPERTY_USE_TICKET_CACHE)));
+            } else if ("Digest".equalsIgnoreCase(authType)) {
+                result +=
+                    createRemoteConfigRegistryXMLWithDigestAuth(props.get(PROPERTY_TYPE),
+                                                                props.get(PROPERTY_NAME),
+                                                                props.get(PROPERTY_ADDRESS),
+                                                                props.get(PROPERTY_PRINCIPAL),
+                                                                props.get(PROPERTY_CRED_ALIAS));
+            } else {
+                result += createRemoteConfigRegistryXMLNoAuth(props.get(PROPERTY_TYPE),
+                                                              props.get(PROPERTY_NAME),
+                                                              props.get(PROPERTY_ADDRESS));
+            }
+        }
+
+        result += "</remote-configuration-registries>\n";
+
+        return result;
+    }
+
+    public static String createRemoteConfigRegistryXMLWithKerberosAuth(String type,
+                                                                       String name,
+                                                                       String address,
+                                                                       String principal,
+                                                                       String keyTab,
+                                                                       boolean userKeyTab,
+                                                                       boolean useTicketCache) {
+        return "  <remote-configuration-registry>\n" +
+               "    <name>" + name + "</name>\n" +
+               "    <type>" + type + "</type>\n" +
+               "    <address>" + address + "</address>\n" +
+               "    <secure>true</secure>\n" +
+               "    <auth-type>" + "Kerberos" + "</auth-type>\n" +
+               "    <principal>" + principal + "</principal>\n" +
+               "    <keytab>" + keyTab + "</keytab>\n" +
+               "    <use-keytab>" + String.valueOf(userKeyTab) + "</use-keytab>\n" +
+               "    <use-ticket-cache>" + String.valueOf(useTicketCache) + "</use-ticket-cache>\n" +
+               "  </remote-configuration-registry>\n";
+    }
+
+    public static String createRemoteConfigRegistryXMLWithDigestAuth(String type,
+                                                                     String name,
+                                                                     String address,
+                                                                     String principal,
+                                                                     String credentialAlias) {
+        return "  <remote-configuration-registry>\n" +
+               "    <name>" + name + "</name>\n" +
+               "    <type>" + type + "</type>\n" +
+               "    <address>" + address + "</address>\n" +
+               "    <secure>true</secure>\n" +
+               "    <auth-type>" + "Digest" + "</auth-type>\n" +
+               "    <principal>" + principal + "</principal>\n" +
+               "    <credential-alias>" + credentialAlias + "</credential-alias>\n" +
+               "  </remote-configuration-registry>\n";
+    }
+
+
+    public static String createRemoteConfigRegistryXMLNoAuth(String type,
+                                                             String name,
+                                                             String address) {
+        return "  <remote-configuration-registry>\n" +
+               "    <name>" + name + "</name>\n" +
+               "    <type>" + type + "</type>\n" +
+               "    <address>" + address + "</address>\n" +
+               "  </remote-configuration-registry>\n";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/knox/blob/5af2413c/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/zk/RemoteConfigurationRegistryClientServiceTest.java
----------------------------------------------------------------------
diff --git a/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/zk/RemoteConfigurationRegistryClientServiceTest.java b/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/zk/RemoteConfigurationRegistryClientServiceTest.java
new file mode 100644
index 0000000..0292ee3
--- /dev/null
+++ b/gateway-service-remoteconfig/src/test/java/org/apache/hadoop/gateway/service/config/remote/zk/RemoteConfigurationRegistryClientServiceTest.java
@@ -0,0 +1,424 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.gateway.service.config.remote.zk;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.InstanceSpec;
+import org.apache.curator.test.TestingCluster;
+import org.apache.hadoop.gateway.config.GatewayConfig;
+import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClient.ChildEntryListener;
+import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClient;
+import org.apache.hadoop.gateway.services.config.client.RemoteConfigurationRegistryClientService;
+import org.apache.hadoop.gateway.service.config.remote.RemoteConfigurationRegistryClientServiceFactory;
+import org.apache.hadoop.gateway.service.config.remote.util.RemoteRegistryConfigTestUtils;
+import org.apache.hadoop.gateway.services.security.AliasService;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.easymock.EasyMock;
+import org.junit.Test;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class RemoteConfigurationRegistryClientServiceTest {
+
+    /**
+     * Test a configuration for an unsecured remote registry, included in the gateway configuration.
+     */
+    @Test
+    public void testUnsecuredZooKeeperWithSimpleRegistryConfig() throws Exception {
+        final String REGISTRY_CLIENT_NAME = "unsecured-zk-registry-name";
+        final String PRINCIPAL = null;
+        final String PWD = null;
+        final String CRED_ALIAS = null;
+
+        // Configure and start a secure ZK cluster
+        TestingCluster zkCluster = setupAndStartSecureTestZooKeeper(PRINCIPAL, PWD);
+
+        try {
+            // Create the setup client for the test cluster, and initialize the test znodes
+            CuratorFramework setupClient = initializeTestClientAndZNodes(zkCluster, PRINCIPAL);
+
+            // Mock configuration
+            GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class);
+            final String registryConfigValue =
+                        GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE + "=" + ZooKeeperClientService.TYPE + ";" +
+                        GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS + "=" + zkCluster.getConnectString();
+            EasyMock.expect(config.getRemoteRegistryConfiguration(REGISTRY_CLIENT_NAME))
+                    .andReturn(registryConfigValue)
+                    .anyTimes();
+            EasyMock.expect(config.getRemoteRegistryConfigurationNames())
+                    .andReturn(Collections.singletonList(REGISTRY_CLIENT_NAME)).anyTimes();
+            EasyMock.replay(config);
+
+            doTestZooKeeperClient(setupClient, REGISTRY_CLIENT_NAME, config, CRED_ALIAS, PWD);
+        } finally {
+            zkCluster.stop();
+        }
+    }
+
+    /**
+     * Test multiple configurations for an unsecured remote registry.
+     */
+    @Test
+    public void testMultipleUnsecuredZooKeeperWithSimpleRegistryConfig() throws Exception {
+        final String REGISTRY_CLIENT_NAME_1 = "zkclient1";
+        final String REGISTRY_CLIENT_NAME_2 = "zkclient2";
+        final String PRINCIPAL = null;
+        final String PWD = null;
+        final String CRED_ALIAS = null;
+
+        // Configure and start a secure ZK cluster
+        TestingCluster zkCluster = setupAndStartSecureTestZooKeeper(PRINCIPAL, PWD);
+
+        try {
+            // Create the setup client for the test cluster, and initialize the test znodes
+            CuratorFramework setupClient = initializeTestClientAndZNodes(zkCluster, PRINCIPAL);
+
+            // Mock configuration
+            GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class);
+            final String registryConfigValue1 =
+                                GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE + "=" + ZooKeeperClientService.TYPE + ";" +
+                                GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS + "=" + zkCluster.getConnectString();
+            EasyMock.expect(config.getRemoteRegistryConfiguration(REGISTRY_CLIENT_NAME_1))
+                    .andReturn(registryConfigValue1).anyTimes();
+            final String registryConfigValue2 =
+                                GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE + "=" + ZooKeeperClientService.TYPE + ";" +
+                                GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS + "=" + zkCluster.getConnectString();
+            EasyMock.expect(config.getRemoteRegistryConfiguration(REGISTRY_CLIENT_NAME_2))
+                    .andReturn(registryConfigValue2).anyTimes();
+            EasyMock.expect(config.getRemoteRegistryConfigurationNames())
+                    .andReturn(Arrays.asList(REGISTRY_CLIENT_NAME_1, REGISTRY_CLIENT_NAME_2)).anyTimes();
+            EasyMock.replay(config);
+
+            // Create the client service instance
+            RemoteConfigurationRegistryClientService clientService =
+                    RemoteConfigurationRegistryClientServiceFactory.newInstance(config);
+            assertEquals("Wrong registry client service type.", clientService.getClass(), CuratorClientService.class);
+            clientService.setAliasService(null);
+            clientService.init(config, null);
+            clientService.start();
+
+            RemoteConfigurationRegistryClient client1 = clientService.get(REGISTRY_CLIENT_NAME_1);
+            assertNotNull(client1);
+
+            RemoteConfigurationRegistryClient client2 = clientService.get(REGISTRY_CLIENT_NAME_2);
+            assertNotNull(client2);
+
+            doTestZooKeeperClient(setupClient, REGISTRY_CLIENT_NAME_1, clientService, false);
+            doTestZooKeeperClient(setupClient, REGISTRY_CLIENT_NAME_2, clientService, false);
+        } finally {
+            zkCluster.stop();
+        }
+    }
+
+    /**
+     * Test a configuration for a secure remote registry, included in the gateway configuration.
+     */
+    @Test
+    public void testZooKeeperWithSimpleRegistryConfig() throws Exception {
+        final String AUTH_TYPE = "digest";
+        final String REGISTRY_CLIENT_NAME = "zk-registry-name";
+        final String PRINCIPAL = "knox";
+        final String PWD = "knoxtest";
+        final String CRED_ALIAS = "zkCredential";
+
+        // Configure and start a secure ZK cluster
+        TestingCluster zkCluster = setupAndStartSecureTestZooKeeper(PRINCIPAL, PWD);
+
+        try {
+            // Create the setup client for the test cluster, and initialize the test znodes
+            CuratorFramework setupClient = initializeTestClientAndZNodes(zkCluster, PRINCIPAL);
+
+            // Mock configuration
+            GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class);
+            final String registryConfigValue =
+                            GatewayConfig.REMOTE_CONFIG_REGISTRY_TYPE + "=" + ZooKeeperClientService.TYPE + ";" +
+                            GatewayConfig.REMOTE_CONFIG_REGISTRY_ADDRESS + "=" + zkCluster.getConnectString() + ";" +
+                            GatewayConfig.REMOTE_CONFIG_REGISTRY_AUTH_TYPE + "=" + AUTH_TYPE + ";" +
+                            GatewayConfig.REMOTE_CONFIG_REGISTRY_PRINCIPAL + "=" + PRINCIPAL + ";" +
+                            GatewayConfig.REMOTE_CONFIG_REGISTRY_CREDENTIAL_ALIAS + "=" + CRED_ALIAS;
+            EasyMock.expect(config.getRemoteRegistryConfiguration(REGISTRY_CLIENT_NAME))
+                    .andReturn(registryConfigValue)
+                    .anyTimes();
+            EasyMock.expect(config.getRemoteRegistryConfigurationNames())
+                    .andReturn(Collections.singletonList(REGISTRY_CLIENT_NAME)).anyTimes();
+            EasyMock.replay(config);
+
+            doTestZooKeeperClient(setupClient, REGISTRY_CLIENT_NAME, config, CRED_ALIAS, PWD);
+        } finally {
+            zkCluster.stop();
+        }
+    }
+
+    /**
+     * Test the remote registry configuration external to, and referenced from, the gateway configuration, for a secure
+     * client.
+     */
+    @Test
+    public void testZooKeeperWithSingleExternalRegistryConfig() throws Exception {
+        final String AUTH_TYPE = "digest";
+        final String REGISTRY_CLIENT_NAME = "my-zookeeper_registryNAME";
+        final String PRINCIPAL = "knox";
+        final String PWD = "knoxtest";
+        final String CRED_ALIAS = "zkCredential";
+
+        // Configure and start a secure ZK cluster
+        TestingCluster zkCluster = setupAndStartSecureTestZooKeeper(PRINCIPAL, PWD);
+
+        File tmpRegConfigFile = null;
+
+        try {
+            // Create the setup client for the test cluster, and initialize the test znodes
+            CuratorFramework setupClient = initializeTestClientAndZNodes(zkCluster, PRINCIPAL);
+
+            // Mock configuration
+            Map<String, String> registryConfigProps = new HashMap<>();
+            registryConfigProps.put("type", ZooKeeperClientService.TYPE);
+            registryConfigProps.put("name", REGISTRY_CLIENT_NAME);
+            registryConfigProps.put("address", zkCluster.getConnectString());
+            registryConfigProps.put("secure", "true");
+            registryConfigProps.put("authType", AUTH_TYPE);
+            registryConfigProps.put("principal", PRINCIPAL);
+            registryConfigProps.put("credentialAlias", CRED_ALIAS);
+            String registryConfigXML =
+                  RemoteRegistryConfigTestUtils.createRemoteConfigRegistriesXML(Collections.singleton(registryConfigProps));
+            tmpRegConfigFile = File.createTempFile("myRemoteRegistryConfig", "xml");
+            FileUtils.writeStringToFile(tmpRegConfigFile, registryConfigXML);
+
+            System.setProperty("org.apache.knox.gateway.remote.registry.config.file", tmpRegConfigFile.getAbsolutePath());
+
+            GatewayConfig config = EasyMock.createNiceMock(GatewayConfig.class);
+            EasyMock.replay(config);
+
+            doTestZooKeeperClient(setupClient, REGISTRY_CLIENT_NAME, config, CRED_ALIAS, PWD);
+        } finally {
+            zkCluster.stop();
+            if (tmpRegConfigFile != null && tmpRegConfigFile.exists()) {
+                tmpRegConfigFile.delete();
+            }
+            System.clearProperty("org.apache.knox.gateway.remote.registry.config.file");
+        }
+    }
+
+    /**
+     * Setup and start a secure test ZooKeeper cluster.
+     */
+    private TestingCluster setupAndStartSecureTestZooKeeper(String principal, String digestPassword) throws Exception {
+        final boolean applyAuthentication = (principal != null);
+
+        // Configure security for the ZK cluster instances
+        Map<String, Object> customInstanceSpecProps = new HashMap<>();
+
+        if (applyAuthentication) {
+            customInstanceSpecProps.put("authProvider.1", "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+            customInstanceSpecProps.put("requireClientAuthScheme", "sasl");
+        }
+
+        // Define the test cluster
+        List<InstanceSpec> instanceSpecs = new ArrayList<>();
+        for (int i = 0 ; i < 3 ; i++) {
+            InstanceSpec is = new InstanceSpec(null, -1, -1, -1, false, (i+1), -1, -1, customInstanceSpecProps);
+            instanceSpecs.add(is);
+        }
+        TestingCluster zkCluster = new TestingCluster(instanceSpecs);
+
+        if (applyAuthentication) {
+            // Setup ZooKeeper server SASL
+            Map<String, String> digestOptions = new HashMap<>();
+            digestOptions.put("user_" + principal, digestPassword);
+            final AppConfigurationEntry[] serverEntries =
+                    {new AppConfigurationEntry("org.apache.zookeeper.server.auth.DigestLoginModule",
+                            AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                            digestOptions)};
+            Configuration.setConfiguration(new Configuration() {
+                @Override
+                public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+                    return ("Server".equalsIgnoreCase(name)) ? serverEntries : null;
+                }
+            });
+        }
+
+        // Start the cluster
+        zkCluster.start();
+
+        return zkCluster;
+    }
+
+    /**
+     * Create a ZooKeeper client with SASL digest auth configured, and initialize the test znodes.
+     */
+    private CuratorFramework initializeTestClientAndZNodes(TestingCluster zkCluster, String principal) throws Exception {
+        // Create the client for the test cluster
+        CuratorFramework setupClient = CuratorFrameworkFactory.builder()
+                                                              .connectString(zkCluster.getConnectString())
+                                                              .retryPolicy(new ExponentialBackoffRetry(100, 3))
+                                                              .build();
+        assertNotNull(setupClient);
+        setupClient.start();
+
+        List<ACL> acls = new ArrayList<>();
+        if (principal != null) {
+            acls.add(new ACL(ZooDefs.Perms.ALL, new Id("sasl", principal)));
+        } else {
+            acls.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+        }
+        setupClient.create().creatingParentsIfNeeded().withACL(acls).forPath("/knox/config/descriptors");
+        setupClient.create().creatingParentsIfNeeded().withACL(acls).forPath("/knox/config/shared-providers");
+
+        List<ACL> negativeACLs = new ArrayList<>();
+        if (principal != null) {
+            negativeACLs.add(new ACL(ZooDefs.Perms.ALL, new Id("sasl", "notyou")));
+        } else {
+            negativeACLs.add(new ACL(ZooDefs.Perms.ALL, ZooDefs.Ids.ANYONE_ID_UNSAFE));
+        }
+        setupClient.create().creatingParentsIfNeeded().withACL(negativeACLs).forPath("/someotherconfig");
+
+        return setupClient;
+    }
+
+    private void doTestZooKeeperClient(final CuratorFramework setupClient,
+                                       final String           testClientName,
+                                       final GatewayConfig    config,
+                                       final String           credentialAlias,
+                                       final String           digestPassword) throws Exception {
+        boolean isSecureTest = (credentialAlias != null && digestPassword != null);
+
+        // Mock alias service
+        AliasService aliasService = EasyMock.createNiceMock(AliasService.class);
+        EasyMock.expect(aliasService.getPasswordFromAliasForGateway(credentialAlias))
+                .andReturn(isSecureTest ? digestPassword.toCharArray() : null)
+                .anyTimes();
+        EasyMock.replay(aliasService);
+
+        // Create the client service instance
+        RemoteConfigurationRegistryClientService clientService =
+                RemoteConfigurationRegistryClientServiceFactory.newInstance(config);
+        assertEquals("Wrong registry client service type.", clientService.getClass(), CuratorClientService.class);
+        clientService.setAliasService(aliasService);
+        clientService.init(config, null);
+        clientService.start();
+
+        doTestZooKeeperClient(setupClient, testClientName, clientService, isSecureTest);
+    }
+
+    /**
+     * Test secure ZooKeeper client interactions.
+     *
+     * @param setupClient    The client used for interacting with ZooKeeper independent from the registry client service.
+     * @param testClientName The name of the client to use from the registry client service.
+     * @param clientService  The RemoteConfigurationRegistryClientService
+     * @param isSecureTest   Flag to indicate whether this is a secure interaction test
+     */
+    private void doTestZooKeeperClient(final CuratorFramework                         setupClient,
+                                       final String                                   testClientName,
+                                       final RemoteConfigurationRegistryClientService clientService,
+                                       boolean                                        isSecureTest) throws Exception {
+
+        RemoteConfigurationRegistryClient client = clientService.get(testClientName);
+        assertNotNull(client);
+        List<String> descriptors = client.listChildEntries("/knox/config/descriptors");
+        assertNotNull(descriptors);
+        for (String descriptor : descriptors) {
+            System.out.println("Descriptor: " + descriptor);
+        }
+
+        List<String> providerConfigs = client.listChildEntries("/knox/config/shared-providers");
+        assertNotNull(providerConfigs);
+        for (String providerConfig : providerConfigs) {
+            System.out.println("Provider config: " + providerConfig);
+        }
+
+        List<String> someotherConfig = client.listChildEntries("/someotherconfig");
+        if (isSecureTest) {
+            assertNull("Expected null because of the ACL mismatch.", someotherConfig);
+        } else {
+            assertNotNull(someotherConfig);
+        }
+
+        // Test listeners
+        final String MY_NEW_ZNODE = "/clientServiceTestNode";
+        final String MY_NEW_DATA_ZNODE = MY_NEW_ZNODE + "/mydata";
+
+        if (setupClient.checkExists().forPath(MY_NEW_ZNODE) != null) {
+            setupClient.delete().deletingChildrenIfNeeded().forPath(MY_NEW_ZNODE);
+        }
+
+        final List<String> listenerLog = new ArrayList<>();
+        client.addChildEntryListener(MY_NEW_ZNODE, (c, type, path) -> {
+            listenerLog.add("EXTERNAL: " + type.toString() + ":" + path);
+            if (ChildEntryListener.Type.ADDED.equals(type)) {
+                try {
+                    c.addEntryListener(path, (cc, p, d) -> listenerLog.add("EXTERNAL: " + p + ":" + (d != null ? new String(d) : "null")));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        client.createEntry(MY_NEW_ZNODE);
+        client.createEntry(MY_NEW_DATA_ZNODE, "more test data");
+        String testData = client.getEntryData(MY_NEW_DATA_ZNODE);
+        assertNotNull(testData);
+        assertEquals("more test data", testData);
+
+        assertTrue(client.entryExists(MY_NEW_DATA_ZNODE));
+        client.setEntryData(MY_NEW_DATA_ZNODE, "still more data");
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            //
+        }
+
+        client.setEntryData(MY_NEW_DATA_ZNODE, "changed completely");
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            //
+        }
+
+        client.deleteEntry(MY_NEW_DATA_ZNODE);
+
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException e) {
+            //
+        }
+
+        assertFalse(listenerLog.isEmpty());
+    }
+
+}