You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2020/03/02 17:50:31 UTC

[nifi] branch master updated: NIFI-7026 Add kerberos password property to NiFi HortonworksSchemaRegistry

This is an automated email from the ASF dual-hosted git repository.

jstorck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fa855c  NIFI-7026 Add kerberos password property to NiFi HortonworksSchemaRegistry
     new c74e713  Merge pull request #4071 from bbende/NIFI-7026
8fa855c is described below

commit 8fa855c8c3947151213bae830721c220ef20ef20
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Wed Feb 19 14:40:13 2020 -0500

    NIFI-7026 Add kerberos password property to NiFi HortonworksSchemaRegistry
---
 NOTICE                                             |    5 +
 .../nifi/security/krb/KerberosPasswordUser.java    |   65 +-
 .../nifi/security/krb/PasswordConfiguration.java   |   46 +
 .../krb/UsernamePasswordCallbackHandler.java       |   57 +
 .../nifi-hwx-schema-registry-service/pom.xml       |  151 +-
 .../client/SchemaRegistryClient.java               | 1498 ++++++++++++++++++++
 .../hortonworks/HortonworksSchemaRegistry.java     |   81 +-
 .../hortonworks/KerberosUserLogin.java             |   69 +
 .../SchemaRegistryClientWithKerberosPassword.java  |   90 ++
 9 files changed, 1861 insertions(+), 201 deletions(-)

diff --git a/NOTICE b/NOTICE
index 877bf11..298543a 100644
--- a/NOTICE
+++ b/NOTICE
@@ -112,3 +112,8 @@ This includes derived works from Dropwizard Metrics available under Apache Softw
   and can be found in
     nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JvmMetrics.java
     nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JmxJvmMetrics.java
+
+This includes derived works from Cloudera Schema Registry available under Apache Software License V2 (https://github.com/hortonworks/registry)
+  Cloudera Schema Registry
+  Copyright 2016-2019 Cloudera, Inc.
+  The code can be found in nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java
\ No newline at end of file
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosPasswordUser.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosPasswordUser.java
index d81fc85..8f9a22a 100644
--- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosPasswordUser.java
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosPasswordUser.java
@@ -19,17 +19,10 @@ package org.apache.nifi.security.krb;
 import org.apache.commons.lang3.Validate;
 
 import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
 import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.AppConfigurationEntry;
 import javax.security.auth.login.Configuration;
 import javax.security.auth.login.LoginContext;
 import javax.security.auth.login.LoginException;
-import java.io.IOException;
-import java.util.HashMap;
 
 /**
  * KerberosUser that authenticates via username and password instead of keytab.
@@ -46,65 +39,9 @@ public class KerberosPasswordUser extends AbstractKerberosUser {
 
     @Override
     protected LoginContext createLoginContext(final Subject subject) throws LoginException {
-        final Configuration configuration = new PasswordConfig();
+        final Configuration configuration = new PasswordConfiguration();
         final CallbackHandler callbackHandler = new UsernamePasswordCallbackHandler(principal, password);
         return new LoginContext("PasswordConf", subject, callbackHandler, configuration);
     }
 
-    /**
-     * JAAS Configuration to use when logging in with username/password.
-     */
-    private static class PasswordConfig extends Configuration {
-
-        @Override
-        public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
-            HashMap<String, String> options = new HashMap<String, String>();
-            options.put("storeKey", "true");
-            options.put("refreshKrb5Config", "true");
-
-            final String krbLoginModuleName = ConfigurationUtil.IS_IBM
-                    ? ConfigurationUtil.IBM_KRB5_LOGIN_MODULE : ConfigurationUtil.SUN_KRB5_LOGIN_MODULE;
-
-            return new AppConfigurationEntry[] {
-                    new AppConfigurationEntry(
-                            krbLoginModuleName,
-                            AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
-                            options
-                    )
-            };
-        }
-
-    }
-
-    /**
-     * CallbackHandler that provides the given username and password.
-     */
-    private static class UsernamePasswordCallbackHandler implements CallbackHandler {
-
-        private final String username;
-        private final String password;
-
-        public UsernamePasswordCallbackHandler(final String username, final String password) {
-            this.username = username;
-            this.password = password;
-            Validate.notBlank(this.username);
-            Validate.notBlank(this.password);
-        }
-
-        public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
-            for (final Callback callback : callbacks) {
-                if (callback instanceof NameCallback) {
-                    final NameCallback nameCallback = (NameCallback) callback;
-                    nameCallback.setName(username);
-                } else if (callback instanceof PasswordCallback) {
-                    final PasswordCallback passwordCallback = (PasswordCallback) callback;
-                    passwordCallback.setPassword(password.toCharArray());
-                } else {
-                    throw new IllegalStateException("Unexpected callback type: " + callback.getClass().getCanonicalName());
-                }
-            }
-        }
-
-    }
-
 }
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/PasswordConfiguration.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/PasswordConfiguration.java
new file mode 100644
index 0000000..04ec7d6
--- /dev/null
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/PasswordConfiguration.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.krb;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.util.HashMap;
+
+/**
+ * JAAS Configuration to use when logging in with username/password.
+ */
+public class PasswordConfiguration extends Configuration {
+
+    @Override
+    public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+        HashMap<String, String> options = new HashMap<String, String>();
+        options.put("storeKey", "true");
+        options.put("refreshKrb5Config", "true");
+
+        final String krbLoginModuleName = ConfigurationUtil.IS_IBM
+                ? ConfigurationUtil.IBM_KRB5_LOGIN_MODULE : ConfigurationUtil.SUN_KRB5_LOGIN_MODULE;
+
+        return new AppConfigurationEntry[] {
+                new AppConfigurationEntry(
+                        krbLoginModuleName,
+                        AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+                        options
+                )
+        };
+    }
+
+}
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/UsernamePasswordCallbackHandler.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/UsernamePasswordCallbackHandler.java
new file mode 100644
index 0000000..222a6fe
--- /dev/null
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/UsernamePasswordCallbackHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.krb;
+
+import org.apache.commons.lang3.Validate;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import java.io.IOException;
+
+/**
+ * CallbackHandler that provides the given username and password.
+ */
+public class UsernamePasswordCallbackHandler implements CallbackHandler {
+
+    private final String username;
+    private final String password;
+
+    public UsernamePasswordCallbackHandler(final String username, final String password) {
+        this.username = username;
+        this.password = password;
+        Validate.notBlank(this.username);
+        Validate.notBlank(this.password);
+    }
+
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+        for (final Callback callback : callbacks) {
+            if (callback instanceof NameCallback) {
+                final NameCallback nameCallback = (NameCallback) callback;
+                nameCallback.setName(username);
+            } else if (callback instanceof PasswordCallback) {
+                final PasswordCallback passwordCallback = (PasswordCallback) callback;
+                passwordCallback.setPassword(password.toCharArray());
+            } else {
+                throw new IllegalStateException("Unexpected callback type: " + callback.getClass().getCanonicalName());
+            }
+        }
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
index 3581576..dcbf5a1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
@@ -67,121 +67,13 @@ limitations under the License.
             <version>1.8.1</version>
         </dependency>
  
-        <!-- Schema Registry Serdes Jar that also pulls in registry client jars -->
+        <!-- Schema Registry Client-->
         <dependency>
             <groupId>com.hortonworks.registries</groupId>
             <artifactId>schema-registry-client</artifactId>
             <version>${hwx.registry.version}</version>
             <exclusions>
                 <exclusion>
-                    <groupId>org.apache.avro</groupId>
-                    <artifactId>avro</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.kafka</groupId>
-                    <artifactId>kafka-clients</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.jboss.logging</groupId>
-                    <artifactId>jboss-logging</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.directory.server</groupId>
-                    <artifactId>apacheds-i18n</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.directory.server</groupId>
-                    <artifactId>apacheds-kerberos-codec</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.servlet.jsp</groupId>
-                    <artifactId>jsp-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.javassist</groupId>
-                    <artifactId>javassist</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.tukaani</groupId>
-                    <artifactId>xz</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.thoughtworks.paranamer</groupId>
-                    <artifactId>paranamer</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.hibernate</groupId>
-                    <artifactId>hibernate-validator</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-annotations</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.fusesource.leveldbjni</groupId>
-                    <artifactId>leveldbjni-all</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>io.netty</groupId>
-                    <artifactId>netty-all</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-auth</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-client</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-common</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.glassfish.hk2</groupId>
-                    <artifactId>osgi-resource-locator</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.github.fge</groupId>
-                    <artifactId>btf</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>javax.mail</groupId>
-                    <artifactId>mailapi</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.googlecode.libphonenumber</groupId>
-                    <artifactId>libphonenumber</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.yaml</groupId>
-                    <artifactId>snakeyaml</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.github.fge</groupId>
-                    <artifactId>msg-simple</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.xerial.snappy</groupId>
-                    <artifactId>snappy-java</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.glassfish.hk2.external</groupId>
-                    <artifactId>aopalliance-repackaged</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>joda-time</groupId>
-                    <artifactId>joda-time</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.fasterxml.jackson.dataformat</groupId>
-                    <artifactId>jackson-dataformat-yaml</artifactId>
-                </exclusion>
-                <exclusion>
                     <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>jackson-databind</artifactId>
                 </exclusion>
@@ -193,33 +85,26 @@ limitations under the License.
                     <groupId>com.fasterxml.jackson.core</groupId>
                     <artifactId>jackson-core</artifactId>
                 </exclusion>
-                <exclusion>
-                    <groupId>org.mozilla</groupId>
-                    <artifactId>rhino</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.github.fge</groupId>
-                    <artifactId>uri-template</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.github.fge</groupId>
-                    <artifactId>jackson-coreutils</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.github.fge</groupId>
-                    <artifactId>json-schema-core</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>com.github.fge</groupId>
-                    <artifactId>json-schema-validator</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.commons</groupId>
-                    <artifactId>commons-compress</artifactId>
-                </exclusion>
             </exclusions>
         </dependency>
 
+        <!-- Upgrade jersey to 2.26 -->
+        <dependency>
+            <groupId>org.glassfish.jersey.core</groupId>
+            <artifactId>jersey-client</artifactId>
+            <version>2.26</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.media</groupId>
+            <artifactId>jersey-media-multipart</artifactId>
+            <version>2.26</version>
+        </dependency>
+        <dependency>
+            <groupId>org.glassfish.jersey.inject</groupId>
+            <artifactId>jersey-hk2</artifactId>
+            <version>2.26</version>
+        </dependency>
+
         <!-- explicitly pulling in jackson and excluding above to not conflict with transitive test dependencies from nifi-mock -->
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java
new file mode 100644
index 0000000..8162f6d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java
@@ -0,0 +1,1498 @@
+/**
+ * Copyright 2016-2019 Cloudera, Inc.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.hortonworks.registries.schemaregistry.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
+import com.hortonworks.registries.auth.KerberosLogin;
+import com.hortonworks.registries.auth.Login;
+import com.hortonworks.registries.auth.NOOPLogin;
+import com.hortonworks.registries.auth.util.JaasConfiguration;
+import com.hortonworks.registries.common.SchemaRegistryServiceInfo;
+import com.hortonworks.registries.common.SchemaRegistryVersion;
+import com.hortonworks.registries.common.catalog.CatalogResponse;
+import com.hortonworks.registries.common.util.ClassLoaderAwareInvocationHandler;
+import com.hortonworks.registries.schemaregistry.CompatibilityResult;
+import com.hortonworks.registries.schemaregistry.ConfigEntry;
+import com.hortonworks.registries.schemaregistry.SchemaBranch;
+import com.hortonworks.registries.schemaregistry.SchemaFieldQuery;
+import com.hortonworks.registries.schemaregistry.SchemaIdVersion;
+import com.hortonworks.registries.schemaregistry.SchemaMetadata;
+import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
+import com.hortonworks.registries.schemaregistry.SchemaProviderInfo;
+import com.hortonworks.registries.schemaregistry.SchemaVersion;
+import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
+import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
+import com.hortonworks.registries.schemaregistry.SchemaVersionMergeResult;
+import com.hortonworks.registries.schemaregistry.SchemaVersionRetriever;
+import com.hortonworks.registries.schemaregistry.SerDesInfo;
+import com.hortonworks.registries.schemaregistry.SerDesPair;
+import com.hortonworks.registries.schemaregistry.cache.SchemaVersionInfoCache;
+import com.hortonworks.registries.schemaregistry.errors.IncompatibleSchemaException;
+import com.hortonworks.registries.schemaregistry.errors.InvalidSchemaBranchDeletionException;
+import com.hortonworks.registries.schemaregistry.errors.InvalidSchemaException;
+import com.hortonworks.registries.schemaregistry.errors.SchemaBranchAlreadyExistsException;
+import com.hortonworks.registries.schemaregistry.errors.SchemaBranchNotFoundException;
+import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
+import com.hortonworks.registries.schemaregistry.exceptions.RegistryRetryableException;
+import com.hortonworks.registries.schemaregistry.serde.SerDesException;
+import com.hortonworks.registries.schemaregistry.serde.SnapshotDeserializer;
+import com.hortonworks.registries.schemaregistry.serde.SnapshotSerializer;
+import com.hortonworks.registries.schemaregistry.serde.pull.PullDeserializer;
+import com.hortonworks.registries.schemaregistry.serde.pull.PullSerializer;
+import com.hortonworks.registries.schemaregistry.serde.push.PushDeserializer;
+import com.hortonworks.registries.schemaregistry.state.SchemaLifecycleException;
+import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateMachineInfo;
+import org.apache.commons.io.IOUtils;
+import org.glassfish.jersey.SslConfigurator;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.JerseyClientBuilder;
+import org.glassfish.jersey.media.multipart.BodyPart;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.media.multipart.MultiPart;
+import org.glassfish.jersey.media.multipart.MultiPartFeature;
+import org.glassfish.jersey.media.multipart.file.StreamDataBodyPart;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import javax.security.auth.login.LoginException;
+import javax.ws.rs.BadRequestException;
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Proxy;
+import java.net.URLEncoder;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.Configuration.DEFAULT_CONNECTION_TIMEOUT;
+import static com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.Configuration.DEFAULT_READ_TIMEOUT;
+import static com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL;
+
+/**
+ * NOTE: This class is a copy of the SchemaRegistryClient from https://github.com/hortonworks/registry.
+ * The changes to this file are the following:
+ *      - Making the 'private Login login' become protected for access by sub-classes
+ *      - Making the 'private Configuration configuration' become protected for access by sub-classes
+ *
+ * This is the default implementation of {@link ISchemaRegistryClient} which connects to the given {@code rootCatalogURL}.
+ * <p>
+ * An instance of SchemaRegistryClient can be instantiated by passing configuration properties like below.
+ * <pre>
+ *     SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
+ * </pre>
+ * <p>
+ * There are different options available as mentioned in {@link Configuration} like
+ * <pre>
+ * - {@link Configuration#SCHEMA_REGISTRY_URL}.
+ * - {@link Configuration#SCHEMA_METADATA_CACHE_SIZE}.
+ * - {@link Configuration#SCHEMA_METADATA_CACHE_EXPIRY_INTERVAL_SECS}.
+ * - {@link Configuration#SCHEMA_VERSION_CACHE_SIZE}.
+ * - {@link Configuration#SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS}.
+ * - {@link Configuration#SCHEMA_TEXT_CACHE_SIZE}.
+ * - {@link Configuration#SCHEMA_TEXT_CACHE_EXPIRY_INTERVAL_SECS}.
+ *
+ * and many other properties like {@link ClientProperties}
+ * </pre>
+ * <pre>
+ * This can be used to
+ *      - register schema metadata
+ *      - add new versions of a schema
+ *      - fetch different versions of schema
+ *      - fetch latest version of a schema
+ *      - check whether the given schema text is compatible with a latest version of the schema
+ *      - register serializer/deserializer for a schema
+ *      - fetch serializer/deserializer for a schema
+ * </pre>
+ */
+public class SchemaRegistryClient implements ISchemaRegistryClient {
+    private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryClient.class);
+
+    private static final String SCHEMA_REGISTRY_PATH = "/schemaregistry";
+    private static final String SCHEMAS_PATH = SCHEMA_REGISTRY_PATH + "/schemas/";
+    private static final String SCHEMA_PROVIDERS_PATH = SCHEMA_REGISTRY_PATH + "/schemaproviders/";
+    private static final String SCHEMAS_BY_ID_PATH = SCHEMA_REGISTRY_PATH + "/schemasById/";
+    private static final String SCHEMA_VERSIONS_PATH = SCHEMAS_PATH + "versions/";
+    private static final String FILES_PATH = SCHEMA_REGISTRY_PATH + "/files/";
+    private static final String SERIALIZERS_PATH = SCHEMA_REGISTRY_PATH + "/serdes/";
+    private static final String REGISTY_CLIENT_JAAS_SECTION = "RegistryClient";
+    private static final Set<Class<?>> DESERIALIZER_INTERFACE_CLASSES = Sets.<Class<?>>newHashSet(SnapshotDeserializer.class, PullDeserializer.class, PushDeserializer.class);
+    private static final Set<Class<?>> SERIALIZER_INTERFACE_CLASSES = Sets.<Class<?>>newHashSet(SnapshotSerializer.class, PullSerializer.class);
+    private static final String SEARCH_FIELDS = SCHEMA_REGISTRY_PATH + "/search/schemas/fields";
+    private static final long KERBEROS_SYNCHRONIZATION_TIMEOUT_MS = 180000;
+
+    private static final String SSL_KEY_PASSWORD = "keyPassword";
+    private static final String SSL_KEY_STORE_PATH = "keyStorePath";
+
+    private static final SchemaRegistryVersion CLIENT_VERSION = SchemaRegistryServiceInfo.get().version();
+
+    protected Login login;
+    private final Client client;
+    private final UrlSelector urlSelector;
+    private final Map<String, SchemaRegistryTargets> urlWithTargets;
+
+    protected final Configuration configuration;
+    private final ClassLoaderCache classLoaderCache;
+    private final SchemaVersionInfoCache schemaVersionInfoCache;
+    private final SchemaMetadataCache schemaMetadataCache;
+    private final Cache<SchemaDigestEntry, SchemaIdVersion> schemaTextCache;
+
+    private static final String SSL_CONFIGURATION_KEY = "schema.registry.client.ssl";
+    private static final String HOSTNAME_VERIFIER_CLASS_KEY = "hostnameVerifierClass";
+
+    /**
+     * Creates {@link SchemaRegistryClient} instance with the given yaml config.
+     *
+     * @param confFile config file which contains the configuration entries.
+     *
+     * @throws IOException when any IOException occurs while reading the given confFile
+     */
+    public SchemaRegistryClient(File confFile) throws IOException {
+        this(buildConfFromFile(confFile));
+    }
+
+    private static Map<String, ?> buildConfFromFile(File confFile) throws IOException {
+        try (FileInputStream fis = new FileInputStream(confFile)) {
+            return (Map<String, Object>) new Yaml().load(IOUtils.toString(fis, "UTF-8"));
+        }
+    }
+
+    public SchemaRegistryClient(Map<String, ?> conf) {
+        configuration = new Configuration(conf);
+        initializeSecurityContext();
+        ClientConfig config = createClientConfig(conf);
+        ClientBuilder clientBuilder = JerseyClientBuilder.newBuilder()
+                .withConfig(config)
+                .property(ClientProperties.FOLLOW_REDIRECTS, Boolean.TRUE);
+        if (conf.containsKey(SSL_CONFIGURATION_KEY)) {
+            Map<String, String> sslConfigurations = (Map<String, String>) conf.get(SSL_CONFIGURATION_KEY);
+            clientBuilder.sslContext(createSSLContext(sslConfigurations));
+            if (sslConfigurations.containsKey(HOSTNAME_VERIFIER_CLASS_KEY)) {
+                HostnameVerifier hostNameVerifier = null;
+                String hostNameVerifierClassName = sslConfigurations.get(HOSTNAME_VERIFIER_CLASS_KEY);
+                try {
+                    hostNameVerifier = (HostnameVerifier) Class.forName(hostNameVerifierClassName).newInstance();
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to instantiate hostNameVerifierClass : " + hostNameVerifierClassName, e);
+                }
+                clientBuilder.hostnameVerifier(hostNameVerifier);
+            }
+        }
+        client = clientBuilder.build();
+        client.register(MultiPartFeature.class);
+
+        // get list of urls and create given or default UrlSelector.
+        urlSelector = createUrlSelector();
+        urlWithTargets = new ConcurrentHashMap<>();
+
+        classLoaderCache = new ClassLoaderCache(this);
+
+        schemaVersionInfoCache = new SchemaVersionInfoCache(
+                new SchemaVersionRetriever() {
+                    @Override
+                    public SchemaVersionInfo retrieveSchemaVersion(SchemaVersionKey key) throws SchemaNotFoundException {
+                        return doGetSchemaVersionInfo(key);
+                    }
+
+                    @Override
+                    public SchemaVersionInfo retrieveSchemaVersion(SchemaIdVersion key) throws SchemaNotFoundException {
+                        return doGetSchemaVersionInfo(key);
+                    }
+                },
+                ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_SIZE.name())).intValue(),
+                ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name())).longValue() * 1000L
+        );
+
+        SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher = createSchemaMetadataFetcher();
+        schemaMetadataCache = new SchemaMetadataCache(((Number) configuration.getValue(Configuration.SCHEMA_METADATA_CACHE_SIZE
+                .name())).longValue(),
+                ((Number) configuration.getValue(Configuration.SCHEMA_METADATA_CACHE_EXPIRY_INTERVAL_SECS
+                        .name())).longValue(),
+                schemaMetadataFetcher);
+
+        schemaTextCache = CacheBuilder.newBuilder()
+                .maximumSize(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_SIZE
+                        .name())).longValue())
+                .expireAfterAccess(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_EXPIRY_INTERVAL_SECS
+                                .name())).longValue(),
+                        TimeUnit.SECONDS)
+                .build();
+    }
+
+    protected void initializeSecurityContext() {
+        String saslJaasConfig = configuration.getValue(Configuration.SASL_JAAS_CONFIG.name());
+        if (saslJaasConfig != null) {
+            KerberosLogin kerberosLogin = new KerberosLogin(KERBEROS_SYNCHRONIZATION_TIMEOUT_MS);
+            try {
+                kerberosLogin.configure(new HashMap<>(), REGISTY_CLIENT_JAAS_SECTION, new JaasConfiguration(REGISTY_CLIENT_JAAS_SECTION, saslJaasConfig));
+                kerberosLogin.login();
+                login = kerberosLogin;
+                return;
+            } catch (LoginException e) {
+                LOG.error("Failed to initialize the dynamic JAAS config: " + saslJaasConfig + ". Attempting static JAAS config.");
+            } catch (Exception e) {
+                LOG.error("Failed to parse the dynamic JAAS config. Attempting static JAAS config.", e);
+            }
+        }
+
+        String jaasConfigFile = System.getProperty("java.security.auth.login.config");
+        if (jaasConfigFile != null && !jaasConfigFile.trim().isEmpty()) {
+            KerberosLogin kerberosLogin = new KerberosLogin(KERBEROS_SYNCHRONIZATION_TIMEOUT_MS);
+            kerberosLogin.configure(new HashMap<>(), REGISTY_CLIENT_JAAS_SECTION);
+            try {
+                kerberosLogin.login();
+                login = kerberosLogin;
+            } catch (LoginException e) {
+                LOG.error("Could not login using jaas config  section " + REGISTY_CLIENT_JAAS_SECTION);
+                login = new NOOPLogin();
+            }
+        } else {
+            LOG.warn("System property for jaas config file is not defined. Its okay if schema registry is not running in secured mode");
+            login = new NOOPLogin();
+        }
+    }
+
+    protected SSLContext createSSLContext(Map<String, String> sslConfigurations) {
+        SslConfigurator sslConfigurator = SslConfigurator.newInstance();
+        if (sslConfigurations.containsKey(SSL_KEY_STORE_PATH)) {
+            sslConfigurator.keyStoreType(sslConfigurations.get("keyStoreType"))
+                    .keyStoreFile(sslConfigurations.get(SSL_KEY_STORE_PATH))
+                    .keyStorePassword(sslConfigurations.get("keyStorePassword"))
+                    .keyStoreProvider(sslConfigurations.get("keyStoreProvider"))
+                    .keyManagerFactoryAlgorithm(sslConfigurations.get("keyManagerFactoryAlgorithm"))
+                    .keyManagerFactoryProvider(sslConfigurations.get("keyManagerFactoryProvider"));
+            if (sslConfigurations.containsKey(SSL_KEY_PASSWORD)) {
+                sslConfigurator.keyPassword(sslConfigurations.get(SSL_KEY_PASSWORD));
+            }
+        }
+
+
+        sslConfigurator.trustStoreType(sslConfigurations.get("trustStoreType"))
+                .trustStoreFile(sslConfigurations.get("trustStorePath"))
+                .trustStorePassword(sslConfigurations.get("trustStorePassword"))
+                .trustStoreProvider(sslConfigurations.get("trustStoreProvider"))
+                .trustManagerFactoryAlgorithm(sslConfigurations.get("trustManagerFactoryAlgorithm"))
+                .trustManagerFactoryProvider(sslConfigurations.get("trustManagerFactoryProvider"));
+
+        sslConfigurator.securityProtocol(sslConfigurations.get("protocol"));
+
+        return sslConfigurator.createSSLContext();
+    }
+
+    private SchemaRegistryTargets currentSchemaRegistryTargets() {
+        String url = urlSelector.select();
+        urlWithTargets.computeIfAbsent(url, s -> new SchemaRegistryTargets(client.target(s)));
+        return urlWithTargets.get(url);
+    }
+
+    private static class SchemaRegistryTargets {
+        private final WebTarget schemaProvidersTarget;
+        private final WebTarget schemasTarget;
+        private final WebTarget schemasByIdTarget;
+        private final WebTarget rootTarget;
+        private final WebTarget searchFieldsTarget;
+        private final WebTarget serializersTarget;
+        private final WebTarget filesTarget;
+        private final WebTarget schemaVersionsTarget;
+        private final WebTarget schemaVersionsByIdTarget;
+        private final WebTarget schemaVersionsStatesMachineTarget;
+
+        SchemaRegistryTargets(WebTarget rootTarget) {
+            this.rootTarget = rootTarget;
+            schemaProvidersTarget = rootTarget.path(SCHEMA_PROVIDERS_PATH);
+            schemasTarget = rootTarget.path(SCHEMAS_PATH);
+            schemasByIdTarget = rootTarget.path(SCHEMAS_BY_ID_PATH);
+            schemaVersionsByIdTarget = schemasTarget.path("versionsById");
+            schemaVersionsTarget = rootTarget.path(SCHEMA_VERSIONS_PATH);
+            schemaVersionsStatesMachineTarget = schemaVersionsTarget.path("statemachine");
+            searchFieldsTarget = rootTarget.path(SEARCH_FIELDS);
+            serializersTarget = rootTarget.path(SERIALIZERS_PATH);
+            filesTarget = rootTarget.path(FILES_PATH);
+        }
+
+    }
+
+    private UrlSelector createUrlSelector() {
+        UrlSelector urlSelector = null;
+        String rootCatalogURL = configuration.getValue(SCHEMA_REGISTRY_URL.name());
+        String urlSelectorClass = configuration.getValue(Configuration.URL_SELECTOR_CLASS.name());
+        if (urlSelectorClass == null) {
+            urlSelector = new LoadBalancedFailoverUrlSelector(rootCatalogURL);
+        } else {
+            try {
+                urlSelector = (UrlSelector) Class.forName(urlSelectorClass)
+                        .getConstructor(String.class)
+                        .newInstance(rootCatalogURL);
+            } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException
+                    | InvocationTargetException e) {
+                throw new RuntimeException(e);
+            }
+        }
+        urlSelector.init(configuration.getConfig());
+
+        return urlSelector;
+    }
+
+    private SchemaMetadataCache.SchemaMetadataFetcher createSchemaMetadataFetcher() {
+        return new SchemaMetadataCache.SchemaMetadataFetcher() {
+            @Override
+            public SchemaMetadataInfo fetch(String name) throws SchemaNotFoundException {
+                try {
+                    return getEntity(currentSchemaRegistryTargets().schemasTarget.path(name), SchemaMetadataInfo.class);
+                } catch (NotFoundException e) {
+                    throw new SchemaNotFoundException(e);
+                }
+            }
+
+            @Override
+            public SchemaMetadataInfo fetch(Long id) throws SchemaNotFoundException {
+                try {
+                    return getEntity(currentSchemaRegistryTargets().schemasByIdTarget.path(id.toString()), SchemaMetadataInfo.class);
+                } catch (NotFoundException e) {
+                    throw new SchemaNotFoundException(e);
+                }
+            }
+        };
+    }
+
+    protected ClientConfig createClientConfig(Map<String, ?> conf) {
+        ClientConfig config = new ClientConfig();
+        config.property(ClientProperties.CONNECT_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT);
+        config.property(ClientProperties.READ_TIMEOUT, DEFAULT_READ_TIMEOUT);
+        config.property(ClientProperties.FOLLOW_REDIRECTS, true);
+        for (Map.Entry<String, ?> entry : conf.entrySet()) {
+            config.property(entry.getKey(), entry.getValue());
+        }
+        return config;
+    }
+
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
+    @Override
+    public Collection<SchemaProviderInfo> getSupportedSchemaProviders() {
+        return getEntities(currentSchemaRegistryTargets().schemaProvidersTarget, SchemaProviderInfo.class);
+    }
+
+    @Override
+    public Long registerSchemaMetadata(SchemaMetadata schemaMetadata) {
+        return addSchemaMetadata(schemaMetadata);
+    }
+
+    @Override
+    public Long addSchemaMetadata(SchemaMetadata schemaMetadata) {
+        SchemaMetadataInfo schemaMetadataInfo = schemaMetadataCache.getIfPresent(SchemaMetadataCache.Key.of(schemaMetadata
+                .getName()));
+        if (schemaMetadataInfo == null) {
+            return doRegisterSchemaMetadata(schemaMetadata, currentSchemaRegistryTargets().schemasTarget);
+        }
+
+        return schemaMetadataInfo.getId();
+    }
+
+    @Override
+    public SchemaMetadataInfo updateSchemaMetadata(String schemaName, SchemaMetadata schemaMetadata) {
+        SchemaMetadataInfo schemaMetadataInfo = postEntity(currentSchemaRegistryTargets().schemasTarget.path(schemaName), schemaMetadata, SchemaMetadataInfo.class);
+        if (schemaMetadataInfo != null) {
+            schemaMetadataCache.put(SchemaMetadataCache.Key.of(schemaName), schemaMetadataInfo);
+        }
+        return schemaMetadataInfo;
+    }
+
+
+    private Long doRegisterSchemaMetadata(SchemaMetadata schemaMetadata, WebTarget schemasTarget) {
+        try {
+            return postEntity(schemasTarget, schemaMetadata, Long.class);
+        } catch(BadRequestException ex) {
+            Response response = ex.getResponse();
+            CatalogResponse catalogResponse = SchemaRegistryClient.readCatalogResponse(response.readEntity(String.class));
+            if(catalogResponse.getResponseCode() == CatalogResponse.ResponseMessage.ENTITY_CONFLICT.getCode()) {
+                return getSchemaMetadataInfo(schemaMetadata.getName()).getId();
+            } else {
+                throw ex;
+            }
+        }
+    }
+
+    @Override
+    public SchemaMetadataInfo getSchemaMetadataInfo(String schemaName) {
+        return schemaMetadataCache.get(SchemaMetadataCache.Key.of(schemaName));
+    }
+
+    @Override
+    public SchemaMetadataInfo getSchemaMetadataInfo(Long schemaMetadataId) {
+        return schemaMetadataCache.get(SchemaMetadataCache.Key.of(schemaMetadataId));
+    }
+
+    @Override
+    public void deleteSchema(String schemaName) throws SchemaNotFoundException {
+        Collection<SchemaVersionInfo> schemaVersionInfos = getAllVersions(schemaName);
+        schemaMetadataCache.invalidateSchemaMetadata(SchemaMetadataCache.Key.of(schemaName));
+        if (schemaVersionInfos != null) {
+            for (SchemaVersionInfo schemaVersionInfo: schemaVersionInfos) {
+                SchemaIdVersion schemaIdVersion = new SchemaIdVersion(schemaVersionInfo.getId());
+                schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(schemaIdVersion));
+            }
+        }
+
+        WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s", schemaName));
+        Response response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<Response>() {
+                @Override
+                public Response run() {
+                    return target.request(MediaType.APPLICATION_JSON_TYPE).delete(Response.class);
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+
+        int status = response.getStatus();
+        if (status == Response.Status.NOT_FOUND.getStatusCode()) {
+            throw new SchemaNotFoundException(response.readEntity(String.class));
+        } else if (status == Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) {
+            throw new RuntimeException(response.readEntity(String.class));
+        }
+    }
+
+    @Override
+    public SchemaIdVersion addSchemaVersion(SchemaMetadata schemaMetadata, SchemaVersion schemaVersion, boolean disableCanonicalCheck) throws
+            InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
+        return addSchemaVersion(SchemaBranch.MASTER_BRANCH, schemaMetadata, schemaVersion, disableCanonicalCheck);
+    }
+
+    @Override
+    public SchemaIdVersion addSchemaVersion(String schemaBranchName, SchemaMetadata schemaMetadata, SchemaVersion schemaVersion, boolean disableCanonicalCheck) throws
+            InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
+        // get it, if it exists in cache
+        SchemaDigestEntry schemaDigestEntry = buildSchemaTextEntry(schemaVersion, schemaMetadata.getName());
+        SchemaIdVersion schemaIdVersion = schemaTextCache.getIfPresent(schemaDigestEntry);
+
+        if (schemaIdVersion == null) {
+            //register schema metadata if it does not exist
+            Long metadataId = registerSchemaMetadata(schemaMetadata);
+            if (metadataId == null) {
+                LOG.error("Schema Metadata [{}] is not registered successfully", schemaMetadata);
+                throw new RuntimeException("Given SchemaMetadata could not be registered: " + schemaMetadata);
+            }
+
+            // add schemaIdVersion
+            schemaIdVersion = addSchemaVersion(schemaBranchName, schemaMetadata.getName(), schemaVersion, disableCanonicalCheck);
+        }
+
+        return schemaIdVersion;
+    }
+
+    @Override
+    public SchemaIdVersion uploadSchemaVersion(String schemaName, String description, InputStream schemaVersionTextFile) throws
+            InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
+        return uploadSchemaVersion(SchemaBranch.MASTER_BRANCH, schemaName, description, schemaVersionTextFile);
+    }
+
+    public SchemaIdVersion uploadSchemaVersion(final String schemaBranchName,
+                                               final String schemaName,
+                                               final String description,
+                                               final InputStream schemaVersionInputStream)
+            throws InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
+
+        SchemaMetadataInfo schemaMetadataInfo = getSchemaMetadataInfo(schemaName);
+        if (schemaMetadataInfo == null) {
+            throw new SchemaNotFoundException("Schema with name " + schemaName + " not found");
+        }
+
+        StreamDataBodyPart streamDataBodyPart = new StreamDataBodyPart("file", schemaVersionInputStream);
+
+        WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(schemaName).path("/versions/upload").queryParam("branch",schemaBranchName);
+        MultiPart multipartEntity =
+                new FormDataMultiPart()
+                        .field("description", description, MediaType.APPLICATION_JSON_TYPE)
+                        .bodyPart(streamDataBodyPart);
+
+        Entity<MultiPart> multiPartEntity = Entity.entity(multipartEntity, MediaType.MULTIPART_FORM_DATA);
+        Response response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<Response>() {
+                @Override
+                public Response run() {
+                    return target.request().post(multiPartEntity, Response.class);
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+        return handleSchemaIdVersionResponse(schemaMetadataInfo, response);
+    }
+
+    private SchemaDigestEntry buildSchemaTextEntry(SchemaVersion schemaVersion, String name) {
+        byte[] digest;
+        try {
+            digest = MessageDigest.getInstance("MD5").digest(schemaVersion.getSchemaText().getBytes("UTF-8"));
+        } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
+            throw new RuntimeException(e.getMessage(), e);
+        }
+
+        // storing schema text string is expensive, so storing digest in cache's key.
+        return new SchemaDigestEntry(name, digest);
+    }
+
+    @Override
+    public SchemaIdVersion addSchemaVersion(final String schemaName, final SchemaVersion schemaVersion, boolean disableCanonicalCheck)
+            throws InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
+        return addSchemaVersion(SchemaBranch.MASTER_BRANCH, schemaName, schemaVersion, disableCanonicalCheck);
+    }
+
+    @Override
+    public SchemaIdVersion addSchemaVersion(final String schemaBranchName, final String schemaName, final SchemaVersion schemaVersion, boolean disableCanonicalCheck)
+            throws InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
+
+        try {
+            return schemaTextCache.get(buildSchemaTextEntry(schemaVersion, schemaName),
+                    () -> doAddSchemaVersion(schemaBranchName, schemaName, schemaVersion, disableCanonicalCheck));
+        } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            LOG.error("Encountered error while adding new version [{}] of schema [{}] and error [{}]", schemaVersion, schemaName, e);
+            if (cause != null) {
+                if (cause instanceof InvalidSchemaException)
+                    throw (InvalidSchemaException) cause;
+                else if (cause instanceof IncompatibleSchemaException) {
+                    throw (IncompatibleSchemaException) cause;
+                } else if (cause instanceof SchemaNotFoundException) {
+                    throw (SchemaNotFoundException) cause;
+                } else {
+                    throw new RuntimeException(cause.getMessage(), cause);
+                }
+            } else {
+                throw new RuntimeException(e.getMessage(), e);
+            }
+        }
+    }
+
+    @Override
+    public void deleteSchemaVersion(SchemaVersionKey schemaVersionKey) throws SchemaNotFoundException, SchemaLifecycleException {
+        schemaVersionInfoCache.invalidateSchema(new SchemaVersionInfoCache.Key(schemaVersionKey));
+
+        WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s/versions/%s", schemaVersionKey
+                .getSchemaName(), schemaVersionKey.getVersion()));
+        Response response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<Response>() {
+                @Override
+                public Response run() {
+                    return target.request(MediaType.APPLICATION_JSON_TYPE).delete(Response.class);
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+
+        handleDeleteSchemaResponse(response);
+    }
+
+    private void handleDeleteSchemaResponse(Response response) throws SchemaNotFoundException, SchemaLifecycleException {
+        String msg = response.readEntity(String.class);
+        switch (Response.Status.fromStatusCode(response.getStatus())) {
+            case NOT_FOUND:
+                throw new SchemaNotFoundException(msg);
+            case BAD_REQUEST:
+                throw new SchemaLifecycleException(msg);
+            case INTERNAL_SERVER_ERROR:
+                throw new RuntimeException(msg);
+        }
+    }
+
+    private SchemaIdVersion doAddSchemaVersion(String schemaBranchName, String schemaName,
+                                               SchemaVersion schemaVersion, boolean disableCanonicalCheck) throws IncompatibleSchemaException, InvalidSchemaException, SchemaNotFoundException {
+        SchemaMetadataInfo schemaMetadataInfo = getSchemaMetadataInfo(schemaName);
+        if (schemaMetadataInfo == null) {
+            throw new SchemaNotFoundException("Schema with name " + schemaName + " not found");
+        }
+
+        WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(schemaName).path("/versions").queryParam("branch", schemaBranchName)
+                .queryParam("disableCanonicalCheck", disableCanonicalCheck);
+        Response response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<Response>() {
+                @Override
+                public Response run() {
+                    return target.request(MediaType.APPLICATION_JSON_TYPE).post(Entity.json(schemaVersion), Response.class);
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+        return handleSchemaIdVersionResponse(schemaMetadataInfo, response);
+    }
+
+    private SchemaIdVersion handleSchemaIdVersionResponse(SchemaMetadataInfo schemaMetadataInfo,
+                                                          Response response) throws IncompatibleSchemaException, InvalidSchemaException {
+        int status = response.getStatus();
+        String msg = response.readEntity(String.class);
+        if (status == Response.Status.BAD_REQUEST.getStatusCode() || status == Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) {
+            CatalogResponse catalogResponse = readCatalogResponse(msg);
+            if (CatalogResponse.ResponseMessage.INCOMPATIBLE_SCHEMA.getCode() == catalogResponse.getResponseCode()) {
+                throw new IncompatibleSchemaException(catalogResponse.getResponseMessage());
+            } else if (CatalogResponse.ResponseMessage.INVALID_SCHEMA.getCode() == catalogResponse.getResponseCode()) {
+                throw new InvalidSchemaException(catalogResponse.getResponseMessage());
+            } else {
+                throw new RuntimeException(catalogResponse.getResponseMessage());
+            }
+
+        }
+
+        Integer version = readEntity(msg, Integer.class);
+
+        SchemaVersionInfo schemaVersionInfo = doGetSchemaVersionInfo(new SchemaVersionKey(schemaMetadataInfo.getSchemaMetadata()
+                .getName(), version));
+
+        return new SchemaIdVersion(schemaMetadataInfo.getId(), version, schemaVersionInfo.getId());
+    }
+
+    public static CatalogResponse readCatalogResponse(String msg) {
+        ObjectMapper objectMapper = new ObjectMapper();
+        try {
+            JsonNode node = objectMapper.readTree(msg);
+
+            return objectMapper.treeToValue(node, CatalogResponse.class);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public SchemaVersionInfo getSchemaVersionInfo(SchemaIdVersion schemaIdVersion) throws SchemaNotFoundException {
+        try {
+            return schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(schemaIdVersion));
+        } catch (SchemaNotFoundException ex) {
+            throw ex;
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    @Override
+    public SchemaVersionInfo getLatestSchemaVersionInfo(String schemaName) throws SchemaNotFoundException {
+        return getLatestSchemaVersionInfo(SchemaBranch.MASTER_BRANCH, schemaName);
+    }
+
+    @Override
+    public SchemaVersionInfo getSchemaVersionInfo(SchemaVersionKey schemaVersionKey) throws SchemaNotFoundException {
+        try {
+            return schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(schemaVersionKey));
+        } catch (SchemaNotFoundException ex) {
+            throw ex;
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    private SchemaVersionInfo doGetSchemaVersionInfo(SchemaIdVersion schemaIdVersion) throws SchemaNotFoundException {
+        if (schemaIdVersion.getSchemaVersionId() != null) {
+            LOG.info("Getting schema version from target registry for [{}]", schemaIdVersion.getSchemaVersionId());
+            return getEntity(currentSchemaRegistryTargets()
+                            .schemaVersionsByIdTarget
+                            .path(schemaIdVersion.getSchemaVersionId().toString()),
+                    SchemaVersionInfo.class);
+        } else if (schemaIdVersion.getSchemaMetadataId() != null) {
+            SchemaMetadataInfo schemaMetadataInfo = getSchemaMetadataInfo(schemaIdVersion.getSchemaMetadataId());
+            SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaMetadataInfo.getSchemaMetadata()
+                    .getName(), schemaIdVersion.getVersion());
+            LOG.info("Getting schema version from target registry for key [{}]", schemaVersionKey);
+            return doGetSchemaVersionInfo(schemaVersionKey);
+        }
+
+        throw new IllegalArgumentException("Given argument not valid: " + schemaIdVersion);
+    }
+
+    private SchemaVersionInfo doGetSchemaVersionInfo(SchemaVersionKey schemaVersionKey) {
+        LOG.info("Getting schema version from target registry for [{}]", schemaVersionKey);
+        String schemaName = schemaVersionKey.getSchemaName();
+        WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s/versions/%d", schemaName, schemaVersionKey
+                .getVersion()));
+
+        return getEntity(webTarget, SchemaVersionInfo.class);
+    }
+
+    @Override
+    public SchemaVersionInfo getLatestSchemaVersionInfo(String schemaBranchName, String schemaName) throws SchemaNotFoundException {
+        WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/versions/latest").queryParam("branch", schemaBranchName);;
+        return getEntity(webTarget, SchemaVersionInfo.class);
+    }
+
+    @Override
+    public Collection<SchemaVersionInfo> getAllVersions(String schemaName) throws SchemaNotFoundException {
+        return getAllVersions(SchemaBranch.MASTER_BRANCH, schemaName);
+    }
+
+    private static String encode(String schemaName) {
+        try {
+            return URLEncoder.encode(schemaName, "UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void enableSchemaVersion(Long schemaVersionId)
+            throws SchemaNotFoundException, SchemaLifecycleException, IncompatibleSchemaException {
+        try {
+            transitionSchemaVersionState(schemaVersionId, "enable", null);
+        } catch (SchemaLifecycleException e) {
+            Throwable cause = e.getCause();
+            if (cause != null && cause instanceof IncompatibleSchemaException) {
+                throw (IncompatibleSchemaException) cause;
+            }
+            throw e;
+        }
+    }
+
+    @Override
+    public void disableSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
+        transitionSchemaVersionState(schemaVersionId, "disable", null);
+    }
+
+    @Override
+    public void deleteSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
+        transitionSchemaVersionState(schemaVersionId, "delete", null);
+    }
+
+    @Override
+    public void archiveSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
+        transitionSchemaVersionState(schemaVersionId, "archive", null);
+    }
+
+    @Override
+    public void startSchemaVersionReview(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
+        transitionSchemaVersionState(schemaVersionId, "startReview", null);
+    }
+
+    @Override
+    public SchemaVersionMergeResult mergeSchemaVersion(Long schemaVersionId, boolean disableCanonicalCheck) throws SchemaNotFoundException, IncompatibleSchemaException {
+        WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(schemaVersionId + "/merge").queryParam("disableCanonicalCheck", disableCanonicalCheck);
+        Response response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<Response>() {
+                @Override
+                public Response run() {
+                    return target.request().post(null);
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+
+        int status = response.getStatus();
+        if (status == Response.Status.OK.getStatusCode()) {
+            String msg = response.readEntity(String.class);
+            return readEntity(msg, SchemaVersionMergeResult.class);
+        } else if (status == Response.Status.NOT_FOUND.getStatusCode()) {
+            throw new SchemaNotFoundException(response.readEntity(String.class));
+        } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) {
+            throw new IncompatibleSchemaException(response.readEntity(String.class));
+        } else {
+            throw new RuntimeException(response.readEntity(String.class));
+        }
+    }
+
+    @Override
+    public void transitionState(Long schemaVersionId,
+                                Byte targetStateId,
+                                byte[] transitionDetails) throws SchemaNotFoundException, SchemaLifecycleException {
+        boolean result = transitionSchemaVersionState(schemaVersionId, targetStateId.toString(), transitionDetails);
+    }
+
+    @Override
+    public SchemaVersionLifecycleStateMachineInfo getSchemaVersionLifecycleStateMachineInfo() {
+        return getEntity(currentSchemaRegistryTargets().schemaVersionsStatesMachineTarget,
+                SchemaVersionLifecycleStateMachineInfo.class);
+    }
+
+    @Override
+    public SchemaBranch createSchemaBranch(Long schemaVersionId, SchemaBranch schemaBranch) throws SchemaBranchAlreadyExistsException, SchemaNotFoundException {
+        WebTarget target = currentSchemaRegistryTargets().schemasTarget.path("versionsById/" + schemaVersionId + "/branch");
+        Response response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<Response>() {
+                @Override
+                public Response run() {
+                    return target.request(MediaType.APPLICATION_JSON_TYPE).post(Entity.json(schemaBranch), Response.class);
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+
+        int status = response.getStatus();
+        if (status == Response.Status.OK.getStatusCode()) {
+            String msg = response.readEntity(String.class);
+            SchemaBranch returnedSchemaBranch = readEntity(msg, SchemaBranch.class);
+            return returnedSchemaBranch;
+        } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) {
+            throw new SchemaNotFoundException(response.readEntity(String.class));
+        } else if (status == Response.Status.CONFLICT.getStatusCode()) {
+            throw new SchemaBranchAlreadyExistsException(response.readEntity(String.class));
+        } else {
+            throw new RuntimeException(response.readEntity(String.class));
+        }
+    }
+
+    @Override
+    public Collection<SchemaBranch> getSchemaBranches(String schemaName) throws SchemaNotFoundException {
+        WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/branches");
+        Response response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<Response>() {
+                @Override
+                public Response run() {
+                    return target.request().get();
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+
+        int status = response.getStatus();
+        if (status == Response.Status.NOT_FOUND.getStatusCode()) {
+            throw new SchemaNotFoundException(response.readEntity(String.class));
+        } else if (status != Response.Status.OK.getStatusCode()) {
+            throw new RuntimeException(response.readEntity(String.class));
+        }
+
+        return parseResponseAsEntities(response.readEntity(String.class), SchemaBranch.class);
+    }
+
+    @Override
+    public void deleteSchemaBranch(Long schemaBranchId) throws SchemaBranchNotFoundException, InvalidSchemaBranchDeletionException {
+        WebTarget target = currentSchemaRegistryTargets().schemasTarget.path("branch/" + schemaBranchId);
+        Response response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<Response>() {
+                @Override
+                public Response run() {
+                    return target.request().delete();
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+
+        int status = response.getStatus();
+        if (status == Response.Status.NOT_FOUND.getStatusCode()) {
+            throw new SchemaBranchNotFoundException(response.readEntity(String.class));
+        } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) {
+            throw new InvalidSchemaBranchDeletionException(response.readEntity(String.class));
+        } else if (status != Response.Status.OK.getStatusCode()) {
+            throw new RuntimeException(response.readEntity(String.class));
+        }
+
+    }
+
+    @Override
+    public Collection<SchemaVersionInfo> getAllVersions(String schemaBranchName, String schemaName, List<Byte> stateIds) throws SchemaNotFoundException, SchemaBranchNotFoundException {
+        WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/versions").queryParam("branch", schemaBranchName).queryParam("states", stateIds.toArray());
+        return getEntities(webTarget, SchemaVersionInfo.class);
+    }
+
+    private boolean transitionSchemaVersionState(Long schemaVersionId,
+                                                 String operationOrTargetState,
+                                                 byte[] transitionDetails) throws SchemaNotFoundException, SchemaLifecycleException {
+
+        WebTarget webTarget = currentSchemaRegistryTargets().schemaVersionsTarget.path(schemaVersionId + "/state/" + operationOrTargetState);
+        Response response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<Response>() {
+                @Override
+                public Response run() {
+                    return webTarget.request().post(Entity.text(transitionDetails));
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+
+        boolean result = handleSchemaLifeCycleResponse(response);
+
+        // invalidate this entry from cache.
+        schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(new SchemaIdVersion(schemaVersionId)));
+
+        return result;
+    }
+
+    private boolean handleSchemaLifeCycleResponse(Response response) throws SchemaNotFoundException, SchemaLifecycleException {
+        boolean result;
+        int status = response.getStatus();
+        if (status == Response.Status.OK.getStatusCode()) {
+            result = response.readEntity(Boolean.class);
+        } else if (status == Response.Status.NOT_FOUND.getStatusCode()) {
+            throw new SchemaNotFoundException(response.readEntity(String.class));
+        } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) {
+            CatalogResponse catalogResponse = readCatalogResponse(response.readEntity(String.class));
+            if (catalogResponse.getResponseCode() == CatalogResponse.ResponseMessage.INCOMPATIBLE_SCHEMA.getCode()) {
+                throw new SchemaLifecycleException(new IncompatibleSchemaException(catalogResponse.getResponseMessage()));
+            }
+            throw new SchemaLifecycleException(catalogResponse.getResponseMessage());
+
+        } else {
+            throw new RuntimeException(response.readEntity(String.class));
+        }
+
+        return result;
+    }
+
+    @Override
+    public Collection<SchemaVersionInfo> getAllVersions(String schemaBranchName, String schemaName) throws SchemaNotFoundException {
+        WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/versions").queryParam("branch", schemaBranchName);
+        return getEntities(webTarget, SchemaVersionInfo.class);
+    }
+
+    @Override
+    public CompatibilityResult checkCompatibility(String schemaName, String toSchemaText) throws SchemaNotFoundException, SchemaBranchNotFoundException {
+        return checkCompatibility(SchemaBranch.MASTER_BRANCH, schemaName, toSchemaText);
+    }
+
+    @Override
+    public CompatibilityResult checkCompatibility(String schemaBranchName, String schemaName,
+                                                  String toSchemaText) throws SchemaNotFoundException {
+        WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/compatibility").queryParam("branch", schemaBranchName);
+        String response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<String>() {
+                @Override
+                public String run() {
+                    return webTarget.request().post(Entity.text(toSchemaText), String.class);
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+        return readEntity(response, CompatibilityResult.class);
+    }
+
+    @Override
+    public boolean isCompatibleWithAllVersions(String schemaName, String toSchemaText) throws SchemaNotFoundException, SchemaBranchNotFoundException {
+        return isCompatibleWithAllVersions(SchemaBranch.MASTER_BRANCH, schemaName, toSchemaText);
+    }
+
+    @Override
+    public boolean isCompatibleWithAllVersions(String schemaBranchName, String schemaName, String toSchemaText) throws SchemaNotFoundException, SchemaBranchNotFoundException {
+        return checkCompatibility(schemaBranchName, schemaName, toSchemaText).isCompatible();
+    }
+
+    @Override
+    public Collection<SchemaVersionKey> findSchemasByFields(SchemaFieldQuery schemaFieldQuery) {
+        WebTarget target = currentSchemaRegistryTargets().searchFieldsTarget;
+        for (Map.Entry<String, String> entry : schemaFieldQuery.toQueryMap().entrySet()) {
+            target = target.queryParam(entry.getKey(), entry.getValue());
+        }
+
+        return getEntities(target, SchemaVersionKey.class);
+    }
+
+    @Override
+    public String uploadFile(InputStream inputStream) {
+        MultiPart multiPart = new MultiPart();
+        BodyPart filePart = new StreamDataBodyPart("file", inputStream, "file");
+        multiPart.bodyPart(filePart);
+        try {
+            return login.doAction(new PrivilegedAction<String>() {
+                @Override
+                public String run() {
+                    return currentSchemaRegistryTargets().filesTarget.request()
+                            .post(Entity.entity(multiPart, MediaType.MULTIPART_FORM_DATA), String.class);
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+    }
+
+    @Override
+    public InputStream downloadFile(String fileId) {
+        try {
+            return login.doAction(new PrivilegedAction<InputStream>() {
+                @Override
+                public InputStream run() {
+                    return currentSchemaRegistryTargets().filesTarget.path("download/" + encode(fileId))
+                            .request()
+                            .get(InputStream.class);
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+    }
+
+    @Override
+    public Long addSerDes(SerDesPair serDesPair) {
+        return postEntity(currentSchemaRegistryTargets().serializersTarget, serDesPair, Long.class);
+    }
+
+    @Override
+    public void mapSchemaWithSerDes(String schemaName, Long serDesId) {
+        String path = String.format("%s/mapping/%s", encode(schemaName), serDesId.toString());
+
+        Boolean success = postEntity(currentSchemaRegistryTargets().schemasTarget.path(path), null, Boolean.class);
+        LOG.info("Received response while mapping schema [{}] with serialzer/deserializer [{}] : [{}]", schemaName, serDesId, success);
+    }
+
+    @Override
+    public <T> T getDefaultSerializer(String type) throws SerDesException {
+        Collection<SchemaProviderInfo> supportedSchemaProviders = getSupportedSchemaProviders();
+        for (SchemaProviderInfo schemaProvider : supportedSchemaProviders) {
+            if (schemaProvider.getType().equals(type)) {
+                try {
+                    return (T) Class.forName(schemaProvider.getDefaultSerializerClassName()).newInstance();
+                } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+                    throw new SerDesException(e);
+                }
+            }
+        }
+
+        throw new IllegalArgumentException("No schema provider registered for the given type " + type);
+    }
+
+    @Override
+    public <T> T getDefaultDeserializer(String type) throws SerDesException {
+        Collection<SchemaProviderInfo> supportedSchemaProviders = getSupportedSchemaProviders();
+        for (SchemaProviderInfo schemaProvider : supportedSchemaProviders) {
+            if (schemaProvider.getType().equals(type)) {
+                try {
+                    return (T) Class.forName(schemaProvider.getDefaultDeserializerClassName()).newInstance();
+                } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+                    throw new SerDesException(e);
+                }
+            }
+        }
+
+        throw new IllegalArgumentException("No schema provider registered for the given type " + type);
+    }
+
+    @Override
+    public Collection<SerDesInfo> getSerDes(String schemaName) {
+        String path = encode(schemaName) + "/serdes/";
+        return getEntities(currentSchemaRegistryTargets().schemasTarget.path(path), SerDesInfo.class);
+    }
+
+    public <T> T createSerializerInstance(SerDesInfo serDesInfo) {
+        return createInstance(serDesInfo, true);
+    }
+
+    @Override
+    public <T> T createDeserializerInstance(SerDesInfo serDesInfo) {
+        return createInstance(serDesInfo, false);
+    }
+
+    @Override
+    public void close() {
+        client.close();
+    }
+
+    public SchemaRegistryVersion clientVersion() {
+        return CLIENT_VERSION;
+    }
+
+    private <T> T createInstance(SerDesInfo serDesInfo, boolean isSerializer) {
+        Set<Class<?>> interfaceClasses = isSerializer ? SERIALIZER_INTERFACE_CLASSES : DESERIALIZER_INTERFACE_CLASSES;
+
+        if (interfaceClasses == null || interfaceClasses.isEmpty()) {
+            throw new IllegalArgumentException("interfaceClasses array must be neither null nor empty.");
+        }
+
+        // loading serializer, create a class loader and and keep them in cache.
+        final SerDesPair serDesPair = serDesInfo.getSerDesPair();
+        String fileId = serDesPair.getFileId();
+        // get class loader for this file ID
+        ClassLoader classLoader = classLoaderCache.getClassLoader(fileId);
+
+        T t;
+        try {
+            String className =
+                    isSerializer ? serDesPair.getSerializerClassName() : serDesPair.getDeserializerClassName();
+
+            Class<T> clazz = (Class<T>) Class.forName(className, true, classLoader);
+            t = clazz.newInstance();
+            List<Class<?>> classes = new ArrayList<>();
+            for (Class<?> interfaceClass : interfaceClasses) {
+                if (interfaceClass.isAssignableFrom(clazz)) {
+                    classes.add(interfaceClass);
+                }
+            }
+
+            if (classes.isEmpty()) {
+                throw new RuntimeException("Given Serialize/Deserializer " + className + " class does not implement any " +
+                        "one of the registered interfaces: " + interfaceClasses);
+            }
+
+            Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
+                    classes.toArray(new Class[classes.size()]),
+                    new ClassLoaderAwareInvocationHandler(classLoader, t));
+        } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+            throw new SerDesException(e);
+        }
+
+        return t;
+    }
+
+    private <T> List<T> getEntities(WebTarget target, Class<T> clazz) {
+        String response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<String>() {
+                @Override
+                public String run() {
+                    return target.request(MediaType.APPLICATION_JSON_TYPE).get(String.class);
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+        return parseResponseAsEntities(response, clazz);
+    }
+
+    private <T> List<T> parseResponseAsEntities(String response, Class<T> clazz) {
+        List<T> entities = new ArrayList<>();
+        try {
+            ObjectMapper mapper = new ObjectMapper();
+            JsonNode node = mapper.readTree(response);
+            Iterator<JsonNode> it = node.get("entities").elements();
+            while (it.hasNext()) {
+                entities.add(mapper.treeToValue(it.next(), clazz));
+            }
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+        return entities;
+    }
+
+    private <T> T postEntity(WebTarget target, Object json, Class<T> responseType) {
+        String response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<String>() {
+                @Override
+                public String run() {
+                    return target.request(MediaType.APPLICATION_JSON_TYPE).post(Entity.json(json), String.class);
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+        return readEntity(response, responseType);
+    }
+
+    private <T> T readEntity(String response, Class<T> clazz) {
+        try {
+            ObjectMapper mapper = new ObjectMapper();
+            return mapper.readValue(response, clazz);
+        } catch (Exception ex) {
+            throw new RuntimeException(ex);
+        }
+    }
+
+    private <T> T getEntity(WebTarget target, Class<T> clazz) {
+        String response = null;
+        try {
+            response = login.doAction(new PrivilegedAction<String>() {
+                @Override
+                public String run() {
+                    return target.request(MediaType.APPLICATION_JSON_TYPE).get(String.class);
+                }
+            });
+        } catch (LoginException e) {
+            throw new RegistryRetryableException(e);
+        }
+
+        return readEntity(response, clazz);
+    }
+
+    public static final class Configuration {
+        // we may want to remove schema.registry prefix from configuration properties as these are all properties
+        // given by client.
+        /**
+         * URL of schema registry to which this client connects to. For ex: http://localhost:9090/api/v1
+         */
+        public static final ConfigEntry<String> SCHEMA_REGISTRY_URL =
+                ConfigEntry.mandatory("schema.registry.url",
+                        String.class,
+                        "URL of schema registry to which this client connects to. For ex: http://localhost:9090/api/v1",
+                        "http://localhost:9090/api/v1",
+                        ConfigEntry.NonEmptyStringValidator.get());
+
+        /**
+         * Default path for downloaded jars to be stored.
+         */
+        public static final String DEFAULT_LOCAL_JARS_PATH = "/tmp/schema-registry/local-jars";
+
+        /**
+         * Local directory path to which downloaded jars should be copied to. For ex: /tmp/schema-registry/local-jars
+         */
+        public static final ConfigEntry<String> LOCAL_JAR_PATH =
+                ConfigEntry.optional("schema.registry.client.local.jars.path",
+                        String.class,
+                        "URL of schema registry to which this client connects to. For ex: http://localhost:9090/api/v1",
+                        DEFAULT_LOCAL_JARS_PATH,
+                        ConfigEntry.NonEmptyStringValidator.get());
+
+        /**
+         * Default value for classloader cache size.
+         */
+        public static final long DEFAULT_CLASSLOADER_CACHE_SIZE = 1024L;
+
+        /**
+         * Default value for cache expiry interval in seconds.
+         */
+        public static final long DEFAULT_CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS = 60 * 60L;
+
+        /**
+         * Maximum size of classloader cache. Default value is {@link #DEFAULT_CLASSLOADER_CACHE_SIZE}
+         * Classloaders are created for serializer/deserializer jars downloaded from schema registry and they will be locally cached.
+         */
+        public static final ConfigEntry<Number> CLASSLOADER_CACHE_SIZE =
+                ConfigEntry.optional("schema.registry.client.class.loader.cache.size",
+                        Integer.class,
+                        "Maximum size of classloader cache",
+                        DEFAULT_CLASSLOADER_CACHE_SIZE,
+                        ConfigEntry.PositiveNumberValidator.get());
+
+        /**
+         * Expiry interval(in seconds) of an entry in classloader cache. Default value is {@link #DEFAULT_CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS}
+         * Classloaders are created for serializer/deserializer jars downloaded from schema registry and they will be locally cached.
+         */
+        public static final ConfigEntry<Number> CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS =
+                ConfigEntry.optional("schema.registry.client.class.loader.cache.expiry.interval.secs",
+                        Integer.class,
+                        "Expiry interval(in seconds) of an entry in classloader cache",
+                        DEFAULT_CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS,
+                        ConfigEntry.PositiveNumberValidator.get());
+
+        public static final long DEFAULT_SCHEMA_CACHE_SIZE = 1024;
+        public static final long DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS = 5 * 60L;
+
+        /**
+         * Maximum size of schema version cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_SIZE}
+         */
+        public static final ConfigEntry<Number> SCHEMA_VERSION_CACHE_SIZE =
+                ConfigEntry.optional("schema.registry.client.schema.version.cache.size",
+                        Integer.class,
+                        "Maximum size of schema version cache",
+                        DEFAULT_SCHEMA_CACHE_SIZE,
+                        ConfigEntry.PositiveNumberValidator.get());
+
+        /**
+         * Expiry interval(in seconds) of an entry in schema version cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS}
+         */
+        public static final ConfigEntry<Number> SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS =
+                ConfigEntry.optional("schema.registry.client.schema.version.cache.expiry.interval.secs",
+                        Integer.class,
+                        "Expiry interval(in seconds) of an entry in schema version cache",
+                        DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS,
+                        ConfigEntry.PositiveNumberValidator.get());
+
+        /**
+         * Maximum size of schema metadata cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_SIZE}
+         */
+        public static final ConfigEntry<Number> SCHEMA_METADATA_CACHE_SIZE =
+                ConfigEntry.optional("schema.registry.client.schema.metadata.cache.size",
+                        Integer.class,
+                        "Maximum size of schema metadata cache",
+                        DEFAULT_SCHEMA_CACHE_SIZE,
+                        ConfigEntry.PositiveNumberValidator.get());
+
+        /**
+         * Expiry interval(in seconds) of an entry in schema metadata cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS}
+         */
+        public static final ConfigEntry<Number> SCHEMA_METADATA_CACHE_EXPIRY_INTERVAL_SECS =
+                ConfigEntry.optional("schema.registry.client.schema.metadata.cache.expiry.interval.secs",
+                        Integer.class,
+                        "Expiry interval(in seconds) of an entry in schema metadata cache",
+                        DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS,
+                        ConfigEntry.PositiveNumberValidator.get());
+
+        /**
+         * Maximum size of schema text cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_SIZE}.
+         * This cache has ability to store/get entries with same schema name and schema text.
+         */
+        public static final ConfigEntry<Number> SCHEMA_TEXT_CACHE_SIZE =
+                ConfigEntry.optional("schema.registry.client.schema.text.cache.size",
+                        Integer.class,
+                        "Maximum size of schema text cache",
+                        DEFAULT_SCHEMA_CACHE_SIZE,
+                        ConfigEntry.PositiveNumberValidator.get());
+
+        /**
+         * Expiry interval(in seconds) of an entry in schema text cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS}
+         */
+        public static final ConfigEntry<Number> SCHEMA_TEXT_CACHE_EXPIRY_INTERVAL_SECS =
+                ConfigEntry.optional("schema.registry.client.schema.text.cache.expiry.interval.secs",
+                        Integer.class,
+                        "Expiry interval(in seconds) of an entry in schema text cache.",
+                        DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS,
+                        ConfigEntry.PositiveNumberValidator.get());
+
+        /**
+         *
+         */
+        public static final ConfigEntry<String> URL_SELECTOR_CLASS =
+                ConfigEntry.optional("schema.registry.client.url.selector",
+                        String.class,
+                        "Schema Registry URL selector class.",
+                        FailoverUrlSelector.class.getName(),
+                        ConfigEntry.NonEmptyStringValidator.get());
+
+        /**
+         *
+         */
+        public static final ConfigEntry<String> SASL_JAAS_CONFIG =
+                ConfigEntry.optional( "sasl.jaas.config",
+                        String.class,
+                        "Schema Registry Dynamic JAAS config for SASL connection.",
+                        null,
+                        ConfigEntry.NonEmptyStringValidator.get());
+
+        // connection properties
+        /**
+         * Default connection timeout on connections created while connecting to schema registry.
+         */
+        public static final int DEFAULT_CONNECTION_TIMEOUT = 30 * 1000;
+
+        /**
+         * Default read timeout on connections created while connecting to schema registry.
+         */
+        public static final int DEFAULT_READ_TIMEOUT = 30 * 1000;
+
+        private final Map<String, ?> config;
+        private final Map<String, ConfigEntry<?>> options;
+
+        public Configuration(Map<String, ?> config) {
+            Field[] fields = this.getClass().getDeclaredFields();
+            this.options = Collections.unmodifiableMap(buildOptions(fields));
+            this.config = buildConfig(config);
+        }
+
+        private Map<String, ?> buildConfig(Map<String, ?> config) {
+            Map<String, Object> result = new HashMap<>();
+            for (Map.Entry<String, ?> entry : config.entrySet()) {
+                String key = entry.getKey();
+                Object value = entry.getValue();
+
+                ConfigEntry configEntry = options.get(key);
+                if (configEntry != null) {
+                    if (value != null) {
+                        configEntry.validator().validate((value));
+                    } else {
+                        value = configEntry.defaultValue();
+                    }
+                }
+                result.put(key, value);
+            }
+
+            return result;
+        }
+
+        private Map<String, ConfigEntry<?>> buildOptions(Field[] fields) {
+            Map<String, ConfigEntry<?>> options = new HashMap<>();
+            for (Field field : fields) {
+                Class<?> type = field.getType();
+
+                if (type.isAssignableFrom(ConfigEntry.class)) {
+                    field.setAccessible(true);
+                    try {
+                        ConfigEntry configEntry = (ConfigEntry) field.get(this);
+                        options.put(configEntry.name(), configEntry);
+                    } catch (IllegalAccessException e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+            return options;
+        }
+
+        public <T> T getValue(String propertyKey) {
+            return (T) (config.containsKey(propertyKey) ? config.get(propertyKey)
+                    : options.get(propertyKey).defaultValue());
+        }
+
+        public Map<String, Object> getConfig() {
+            return Collections.unmodifiableMap(config);
+        }
+
+        public Collection<ConfigEntry<?>> getAvailableConfigEntries() {
+            return options.values();
+        }
+
+    }
+
+    private static class SchemaDigestEntry {
+        private final String name;
+        private final byte[] schemaDigest;
+
+        SchemaDigestEntry(String name, byte[] schemaDigest) {
+            Preconditions.checkNotNull(name, "name can not be null");
+            Preconditions.checkNotNull(schemaDigest, "schema digest can not be null");
+
+            this.name = name;
+            this.schemaDigest = schemaDigest;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            SchemaDigestEntry that = (SchemaDigestEntry) o;
+
+            if (name != null ? !name.equals(that.name) : that.name != null) return false;
+            return Arrays.equals(schemaDigest, that.schemaDigest);
+
+        }
+
+        @Override
+        public int hashCode() {
+            int result = name != null ? name.hashCode() : 0;
+            result = 31 * result + Arrays.hashCode(schemaDigest);
+            return result;
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index a18a5bd..0e42afc 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -31,6 +31,8 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -46,6 +48,7 @@ import org.apache.nifi.util.Tuple;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
@@ -117,10 +120,66 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
         .required(false)
         .build();
 
+    static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
+            .name("kerberos-principal")
+            .displayName("Kerberos Principal")
+            .description("The kerberos principal to authenticate with when not using the kerberos credentials service")
+            .defaultValue(null)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
+    static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
+            .name("kerberos-password")
+            .displayName("Kerberos Password")
+            .description("The password for the kerberos principal when not using the kerberos credentials service")
+            .defaultValue(null)
+            .required(false)
+            .sensitive(true)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    private volatile boolean usingKerberosWithPassword = false;
     private volatile SchemaRegistryClient schemaRegistryClient;
     private volatile boolean initialized;
     private volatile Map<String, Object> schemaRegistryConfig;
 
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        final String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        final String kerberosPassword = validationContext.getProperty(KERBEROS_PASSWORD).getValue();
+
+        final KerberosCredentialsService kerberosCredentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)
+                .asControllerService(KerberosCredentialsService.class);
+
+        if (kerberosCredentialsService != null && !StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
+            results.add(new ValidationResult.Builder()
+                    .subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
+                    .valid(false)
+                    .explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time")
+                    .build());
+        }
+
+        if (!StringUtils.isBlank(kerberosPrincipal) && StringUtils.isBlank(kerberosPassword)) {
+            results.add(new ValidationResult.Builder()
+                    .subject(KERBEROS_PASSWORD.getDisplayName())
+                    .valid(false)
+                    .explanation("kerberos password is required when specifying a kerberos principal")
+                    .build());
+        }
+
+        if (StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
+            results.add(new ValidationResult.Builder()
+                    .subject(KERBEROS_PRINCIPAL.getDisplayName())
+                    .valid(false)
+                    .explanation("kerberos principal is required when specifying a kerberos password")
+                    .build());
+        }
+
+        return results;
+    }
 
     @OnEnabled
     public void enable(final ConfigurationContext context) throws InitializationException {
@@ -146,17 +205,26 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
             schemaRegistryConfig.put(CLIENT_SSL_PROPERTY_PREFIX, sslProperties);
         }
 
+        final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+        final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
+
         final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE)
                 .asControllerService(KerberosCredentialsService.class);
+
         if (kerberosCredentialsService != null) {
             final String principal = kerberosCredentialsService.getPrincipal();
             final String keytab = kerberosCredentialsService.getKeytab();
-            final String jaasConfigString = getJaasConfig(principal, keytab);
+            final String jaasConfigString = getKeytabJaasConfig(principal, keytab);
             schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SASL_JAAS_CONFIG.name(), jaasConfigString);
+        } else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
+            schemaRegistryConfig.put(SchemaRegistryClientWithKerberosPassword.SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL, kerberosPrincipal);
+            schemaRegistryConfig.put(SchemaRegistryClientWithKerberosPassword.SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD, kerberosPassword);
+            schemaRegistryConfig.put(SchemaRegistryClientWithKerberosPassword.SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER, getLogger());
+            usingKerberosWithPassword = true;
         }
     }
 
-    private String getJaasConfig(final String principal, final String keytab) {
+    private String getKeytabJaasConfig(final String principal, final String keytab) {
         return "com.sun.security.auth.module.Krb5LoginModule required "
                 + "useTicketCache=false "
                 + "renewTicket=true "
@@ -205,20 +273,25 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
         properties.add(CACHE_EXPIRATION);
         properties.add(SSL_CONTEXT_SERVICE);
         properties.add(KERBEROS_CREDENTIALS_SERVICE);
+        properties.add(KERBEROS_PRINCIPAL);
+        properties.add(KERBEROS_PASSWORD);
         return properties;
     }
 
 
     protected synchronized SchemaRegistryClient getClient() {
         if (!initialized) {
-            schemaRegistryClient = new SchemaRegistryClient(schemaRegistryConfig);
+            if (usingKerberosWithPassword) {
+                schemaRegistryClient = new SchemaRegistryClientWithKerberosPassword(schemaRegistryConfig);
+            } else {
+                schemaRegistryClient = new SchemaRegistryClient(schemaRegistryConfig);
+            }
             initialized = true;
         }
 
         return schemaRegistryClient;
     }
 
-
     private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName, final String branchName)
             throws org.apache.nifi.schema.access.SchemaNotFoundException {
         try {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/KerberosUserLogin.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/KerberosUserLogin.java
new file mode 100644
index 0000000..d73f348
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/KerberosUserLogin.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.hortonworks;
+
+import com.hortonworks.registries.auth.Login;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+
+/**
+ * Implementation of Schema Registry's Login interface that wraps NiFi's KerberosUser API.
+ */
+public class KerberosUserLogin implements Login {
+
+    private final KerberosUser kerberosUser;
+    private final ComponentLog logger;
+
+    public KerberosUserLogin(final KerberosUser kerberosUser, final ComponentLog logger) {
+        this.kerberosUser = kerberosUser;
+        this.logger = logger;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs, String loginContextName) {
+
+    }
+
+    @Override
+    public LoginContext login() throws LoginException {
+        kerberosUser.login();
+
+        // the KerberosUser doesn't expose the LoginContext, but SchemaRegistryClient doesn't use
+        // the returned context at all, so we just return null here
+        return null;
+    }
+
+    @Override
+    public <T> T doAction(PrivilegedAction<T> action) throws LoginException {
+        final PrivilegedExceptionAction<T> wrappedAction = () -> action.run();
+        final KerberosAction<T> kerberosAction = new KerberosAction<T>(kerberosUser, wrappedAction, logger);
+        return kerberosAction.execute();
+    }
+
+    @Override
+    public void close() {
+
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/SchemaRegistryClientWithKerberosPassword.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/SchemaRegistryClientWithKerberosPassword.java
new file mode 100644
index 0000000..ca84f8f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/SchemaRegistryClientWithKerberosPassword.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.hortonworks;
+
+import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.security.krb.KerberosPasswordUser;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.LoginException;
+import java.util.Map;
+
+/**
+ * Extend the SchemaRegistryClient so we can override the initialization of the security context and use
+ * the KerberosUserLogin implementation that lets us login with a principal/password.
+ */
+public class SchemaRegistryClientWithKerberosPassword extends SchemaRegistryClient {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRegistryClientWithKerberosPassword.class);
+
+    public static final String SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL = "schema.registry.client.kerberos.principal";
+    public static final String SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD = "schema.registry.client.kerberos.password";
+    public static final String SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER = "schema.registry.client.nifi.component.logger";
+
+    private KerberosUser kerberosUser;
+
+    public SchemaRegistryClientWithKerberosPassword(final Map<String, ?> conf) {
+        super(conf);
+    }
+
+    @Override
+    protected void initializeSecurityContext() {
+        final String principal = configuration.getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL);
+        if (principal == null) {
+            throw new IllegalArgumentException("Failed to login because principal is null");
+        }
+
+        final String password = configuration.getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD);
+        if (password == null) {
+            throw new IllegalArgumentException("Failed to login because password is null");
+        }
+
+        final Object loggerObject = configuration.getValue(SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER);
+        if (loggerObject == null) {
+            throw new IllegalArgumentException("Failed to login because component logger is required");
+        }
+
+        if (!(loggerObject instanceof ComponentLog)) {
+            throw new IllegalArgumentException("Failed to login because logger object is not a ComponentLog");
+        }
+
+        kerberosUser = new KerberosPasswordUser(principal, password);
+        login = new KerberosUserLogin(kerberosUser, (ComponentLog) loggerObject);
+
+        try {
+            login.login();
+        } catch (LoginException e) {
+            LOGGER.error("Failed to login as principal `{}`", new Object[]{principal}, e);
+        }
+    }
+
+    @Override
+    public void close() {
+        try {
+            kerberosUser.logout();
+        } catch (Throwable t) {
+            LOGGER.error("Error performing logout of principal during close(): " + t.getMessage(), t);
+        } finally {
+            kerberosUser = null;
+        }
+
+        super.close();
+    }
+}