You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by js...@apache.org on 2020/03/02 17:50:31 UTC
[nifi] branch master updated: NIFI-7026 Add kerberos password
property to NiFi HortonworksSchemaRegistry
This is an automated email from the ASF dual-hosted git repository.
jstorck pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 8fa855c NIFI-7026 Add kerberos password property to NiFi HortonworksSchemaRegistry
new c74e713 Merge pull request #4071 from bbende/NIFI-7026
8fa855c is described below
commit 8fa855c8c3947151213bae830721c220ef20ef20
Author: Bryan Bende <bb...@apache.org>
AuthorDate: Wed Feb 19 14:40:13 2020 -0500
NIFI-7026 Add kerberos password property to NiFi HortonworksSchemaRegistry
---
NOTICE | 5 +
.../nifi/security/krb/KerberosPasswordUser.java | 65 +-
.../nifi/security/krb/PasswordConfiguration.java | 46 +
.../krb/UsernamePasswordCallbackHandler.java | 57 +
.../nifi-hwx-schema-registry-service/pom.xml | 151 +-
.../client/SchemaRegistryClient.java | 1498 ++++++++++++++++++++
.../hortonworks/HortonworksSchemaRegistry.java | 81 +-
.../hortonworks/KerberosUserLogin.java | 69 +
.../SchemaRegistryClientWithKerberosPassword.java | 90 ++
9 files changed, 1861 insertions(+), 201 deletions(-)
diff --git a/NOTICE b/NOTICE
index 877bf11..298543a 100644
--- a/NOTICE
+++ b/NOTICE
@@ -112,3 +112,8 @@ This includes derived works from Dropwizard Metrics available under Apache Softw
and can be found in
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JvmMetrics.java
nifi-commons/nifi-metrics/src/main/java/org/apache/nifi/metrics/jvm/JmxJvmMetrics.java
+
+This includes derived works from Cloudera Schema Registry available under Apache Software License V2 (https://github.com/hortonworks/registry)
+ Cloudera Schema Registry
+ Copyright 2016-2019 Cloudera, Inc.
+ The code can be found in nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java
\ No newline at end of file
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosPasswordUser.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosPasswordUser.java
index d81fc85..8f9a22a 100644
--- a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosPasswordUser.java
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/KerberosPasswordUser.java
@@ -19,17 +19,10 @@ package org.apache.nifi.security.krb;
import org.apache.commons.lang3.Validate;
import javax.security.auth.Subject;
-import javax.security.auth.callback.Callback;
import javax.security.auth.callback.CallbackHandler;
-import javax.security.auth.callback.NameCallback;
-import javax.security.auth.callback.PasswordCallback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.auth.login.AppConfigurationEntry;
import javax.security.auth.login.Configuration;
import javax.security.auth.login.LoginContext;
import javax.security.auth.login.LoginException;
-import java.io.IOException;
-import java.util.HashMap;
/**
* KerberosUser that authenticates via username and password instead of keytab.
@@ -46,65 +39,9 @@ public class KerberosPasswordUser extends AbstractKerberosUser {
@Override
protected LoginContext createLoginContext(final Subject subject) throws LoginException {
- final Configuration configuration = new PasswordConfig();
+ final Configuration configuration = new PasswordConfiguration();
final CallbackHandler callbackHandler = new UsernamePasswordCallbackHandler(principal, password);
return new LoginContext("PasswordConf", subject, callbackHandler, configuration);
}
- /**
- * JAAS Configuration to use when logging in with username/password.
- */
- private static class PasswordConfig extends Configuration {
-
- @Override
- public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
- HashMap<String, String> options = new HashMap<String, String>();
- options.put("storeKey", "true");
- options.put("refreshKrb5Config", "true");
-
- final String krbLoginModuleName = ConfigurationUtil.IS_IBM
- ? ConfigurationUtil.IBM_KRB5_LOGIN_MODULE : ConfigurationUtil.SUN_KRB5_LOGIN_MODULE;
-
- return new AppConfigurationEntry[] {
- new AppConfigurationEntry(
- krbLoginModuleName,
- AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
- options
- )
- };
- }
-
- }
-
- /**
- * CallbackHandler that provides the given username and password.
- */
- private static class UsernamePasswordCallbackHandler implements CallbackHandler {
-
- private final String username;
- private final String password;
-
- public UsernamePasswordCallbackHandler(final String username, final String password) {
- this.username = username;
- this.password = password;
- Validate.notBlank(this.username);
- Validate.notBlank(this.password);
- }
-
- public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
- for (final Callback callback : callbacks) {
- if (callback instanceof NameCallback) {
- final NameCallback nameCallback = (NameCallback) callback;
- nameCallback.setName(username);
- } else if (callback instanceof PasswordCallback) {
- final PasswordCallback passwordCallback = (PasswordCallback) callback;
- passwordCallback.setPassword(password.toCharArray());
- } else {
- throw new IllegalStateException("Unexpected callback type: " + callback.getClass().getCanonicalName());
- }
- }
- }
-
- }
-
}
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/PasswordConfiguration.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/PasswordConfiguration.java
new file mode 100644
index 0000000..04ec7d6
--- /dev/null
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/PasswordConfiguration.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.krb;
+
+import javax.security.auth.login.AppConfigurationEntry;
+import javax.security.auth.login.Configuration;
+import java.util.HashMap;
+
+/**
+ * JAAS Configuration to use when logging in with username/password.
+ */
+public class PasswordConfiguration extends Configuration {
+
+ @Override
+ public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+ HashMap<String, String> options = new HashMap<String, String>();
+ options.put("storeKey", "true");
+ options.put("refreshKrb5Config", "true");
+
+ final String krbLoginModuleName = ConfigurationUtil.IS_IBM
+ ? ConfigurationUtil.IBM_KRB5_LOGIN_MODULE : ConfigurationUtil.SUN_KRB5_LOGIN_MODULE;
+
+ return new AppConfigurationEntry[] {
+ new AppConfigurationEntry(
+ krbLoginModuleName,
+ AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+ options
+ )
+ };
+ }
+
+}
diff --git a/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/UsernamePasswordCallbackHandler.java b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/UsernamePasswordCallbackHandler.java
new file mode 100644
index 0000000..222a6fe
--- /dev/null
+++ b/nifi-commons/nifi-security-utils/src/main/java/org/apache/nifi/security/krb/UsernamePasswordCallbackHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.security.krb;
+
+import org.apache.commons.lang3.Validate;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import java.io.IOException;
+
+/**
+ * CallbackHandler that provides the given username and password.
+ */
+public class UsernamePasswordCallbackHandler implements CallbackHandler {
+
+ private final String username;
+ private final String password;
+
+ public UsernamePasswordCallbackHandler(final String username, final String password) {
+ this.username = username;
+ this.password = password;
+ Validate.notBlank(this.username);
+ Validate.notBlank(this.password);
+ }
+
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ for (final Callback callback : callbacks) {
+ if (callback instanceof NameCallback) {
+ final NameCallback nameCallback = (NameCallback) callback;
+ nameCallback.setName(username);
+ } else if (callback instanceof PasswordCallback) {
+ final PasswordCallback passwordCallback = (PasswordCallback) callback;
+ passwordCallback.setPassword(password.toCharArray());
+ } else {
+ throw new IllegalStateException("Unexpected callback type: " + callback.getClass().getCanonicalName());
+ }
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
index 3581576..dcbf5a1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/pom.xml
@@ -67,121 +67,13 @@ limitations under the License.
<version>1.8.1</version>
</dependency>
- <!-- Schema Registry Serdes Jar that also pulls in registry client jars -->
+ <!-- Schema Registry Client-->
<dependency>
<groupId>com.hortonworks.registries</groupId>
<artifactId>schema-registry-client</artifactId>
<version>${hwx.registry.version}</version>
<exclusions>
<exclusion>
- <groupId>org.apache.avro</groupId>
- <artifactId>avro</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-clients</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.jboss.logging</groupId>
- <artifactId>jboss-logging</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.directory.server</groupId>
- <artifactId>apacheds-i18n</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.directory.server</groupId>
- <artifactId>apacheds-kerberos-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.servlet.jsp</groupId>
- <artifactId>jsp-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.javassist</groupId>
- <artifactId>javassist</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.tukaani</groupId>
- <artifactId>xz</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.thoughtworks.paranamer</groupId>
- <artifactId>paranamer</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.hibernate</groupId>
- <artifactId>hibernate-validator</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-annotations</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.fusesource.leveldbjni</groupId>
- <artifactId>leveldbjni-all</artifactId>
- </exclusion>
- <exclusion>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-auth</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.glassfish.hk2</groupId>
- <artifactId>osgi-resource-locator</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.github.fge</groupId>
- <artifactId>btf</artifactId>
- </exclusion>
- <exclusion>
- <groupId>javax.mail</groupId>
- <artifactId>mailapi</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.googlecode.libphonenumber</groupId>
- <artifactId>libphonenumber</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.yaml</groupId>
- <artifactId>snakeyaml</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.github.fge</groupId>
- <artifactId>msg-simple</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.xerial.snappy</groupId>
- <artifactId>snappy-java</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.glassfish.hk2.external</groupId>
- <artifactId>aopalliance-repackaged</artifactId>
- </exclusion>
- <exclusion>
- <groupId>joda-time</groupId>
- <artifactId>joda-time</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.fasterxml.jackson.dataformat</groupId>
- <artifactId>jackson-dataformat-yaml</artifactId>
- </exclusion>
- <exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
@@ -193,33 +85,26 @@ limitations under the License.
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</exclusion>
- <exclusion>
- <groupId>org.mozilla</groupId>
- <artifactId>rhino</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.github.fge</groupId>
- <artifactId>uri-template</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.github.fge</groupId>
- <artifactId>jackson-coreutils</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.github.fge</groupId>
- <artifactId>json-schema-core</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.github.fge</groupId>
- <artifactId>json-schema-validator</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-compress</artifactId>
- </exclusion>
</exclusions>
</dependency>
+ <!-- Upgrade jersey to 2.26 -->
+ <dependency>
+ <groupId>org.glassfish.jersey.core</groupId>
+ <artifactId>jersey-client</artifactId>
+ <version>2.26</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.media</groupId>
+ <artifactId>jersey-media-multipart</artifactId>
+ <version>2.26</version>
+ </dependency>
+ <dependency>
+ <groupId>org.glassfish.jersey.inject</groupId>
+ <artifactId>jersey-hk2</artifactId>
+ <version>2.26</version>
+ </dependency>
+
<!-- explicitly pulling in jackson and excluding above to not conflict with transitive test dependencies from nifi-mock -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java
new file mode 100644
index 0000000..8162f6d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/com/hortonworks/registries/schemaregistry/client/SchemaRegistryClient.java
@@ -0,0 +1,1498 @@
+/**
+ * Copyright 2016-2019 Cloudera, Inc.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package com.hortonworks.registries.schemaregistry.client;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.collect.Sets;
+import com.hortonworks.registries.auth.KerberosLogin;
+import com.hortonworks.registries.auth.Login;
+import com.hortonworks.registries.auth.NOOPLogin;
+import com.hortonworks.registries.auth.util.JaasConfiguration;
+import com.hortonworks.registries.common.SchemaRegistryServiceInfo;
+import com.hortonworks.registries.common.SchemaRegistryVersion;
+import com.hortonworks.registries.common.catalog.CatalogResponse;
+import com.hortonworks.registries.common.util.ClassLoaderAwareInvocationHandler;
+import com.hortonworks.registries.schemaregistry.CompatibilityResult;
+import com.hortonworks.registries.schemaregistry.ConfigEntry;
+import com.hortonworks.registries.schemaregistry.SchemaBranch;
+import com.hortonworks.registries.schemaregistry.SchemaFieldQuery;
+import com.hortonworks.registries.schemaregistry.SchemaIdVersion;
+import com.hortonworks.registries.schemaregistry.SchemaMetadata;
+import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
+import com.hortonworks.registries.schemaregistry.SchemaProviderInfo;
+import com.hortonworks.registries.schemaregistry.SchemaVersion;
+import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
+import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
+import com.hortonworks.registries.schemaregistry.SchemaVersionMergeResult;
+import com.hortonworks.registries.schemaregistry.SchemaVersionRetriever;
+import com.hortonworks.registries.schemaregistry.SerDesInfo;
+import com.hortonworks.registries.schemaregistry.SerDesPair;
+import com.hortonworks.registries.schemaregistry.cache.SchemaVersionInfoCache;
+import com.hortonworks.registries.schemaregistry.errors.IncompatibleSchemaException;
+import com.hortonworks.registries.schemaregistry.errors.InvalidSchemaBranchDeletionException;
+import com.hortonworks.registries.schemaregistry.errors.InvalidSchemaException;
+import com.hortonworks.registries.schemaregistry.errors.SchemaBranchAlreadyExistsException;
+import com.hortonworks.registries.schemaregistry.errors.SchemaBranchNotFoundException;
+import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
+import com.hortonworks.registries.schemaregistry.exceptions.RegistryRetryableException;
+import com.hortonworks.registries.schemaregistry.serde.SerDesException;
+import com.hortonworks.registries.schemaregistry.serde.SnapshotDeserializer;
+import com.hortonworks.registries.schemaregistry.serde.SnapshotSerializer;
+import com.hortonworks.registries.schemaregistry.serde.pull.PullDeserializer;
+import com.hortonworks.registries.schemaregistry.serde.pull.PullSerializer;
+import com.hortonworks.registries.schemaregistry.serde.push.PushDeserializer;
+import com.hortonworks.registries.schemaregistry.state.SchemaLifecycleException;
+import com.hortonworks.registries.schemaregistry.state.SchemaVersionLifecycleStateMachineInfo;
+import org.apache.commons.io.IOUtils;
+import org.glassfish.jersey.SslConfigurator;
+import org.glassfish.jersey.client.ClientConfig;
+import org.glassfish.jersey.client.ClientProperties;
+import org.glassfish.jersey.client.JerseyClientBuilder;
+import org.glassfish.jersey.media.multipart.BodyPart;
+import org.glassfish.jersey.media.multipart.FormDataMultiPart;
+import org.glassfish.jersey.media.multipart.MultiPart;
+import org.glassfish.jersey.media.multipart.MultiPartFeature;
+import org.glassfish.jersey.media.multipart.file.StreamDataBodyPart;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.SSLContext;
+import javax.security.auth.login.LoginException;
+import javax.ws.rs.BadRequestException;
+import javax.ws.rs.NotFoundException;
+import javax.ws.rs.client.Client;
+import javax.ws.rs.client.ClientBuilder;
+import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.WebTarget;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Proxy;
+import java.net.URLEncoder;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.Configuration.DEFAULT_CONNECTION_TIMEOUT;
+import static com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.Configuration.DEFAULT_READ_TIMEOUT;
+import static com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL;
+
+/**
+ * NOTE: This class is a copy of the SchemaRegistryClient from https://github.com/hortonworks/registry.
+ * The changes to this file are the following:
+ * - Making the 'private Login login' become protected for access by sub-classes
+ * - Making the 'private Configuration configuration' become protected for access by sub-classes
+ *
+ * This is the default implementation of {@link ISchemaRegistryClient} which connects to the given {@code rootCatalogURL}.
+ * <p>
+ * An instance of SchemaRegistryClient can be instantiated by passing configuration properties like below.
+ * <pre>
+ * SchemaRegistryClient schemaRegistryClient = new SchemaRegistryClient(config);
+ * </pre>
+ * <p>
+ * There are different options available as mentioned in {@link Configuration} like
+ * <pre>
+ * - {@link Configuration#SCHEMA_REGISTRY_URL}.
+ * - {@link Configuration#SCHEMA_METADATA_CACHE_SIZE}.
+ * - {@link Configuration#SCHEMA_METADATA_CACHE_EXPIRY_INTERVAL_SECS}.
+ * - {@link Configuration#SCHEMA_VERSION_CACHE_SIZE}.
+ * - {@link Configuration#SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS}.
+ * - {@link Configuration#SCHEMA_TEXT_CACHE_SIZE}.
+ * - {@link Configuration#SCHEMA_TEXT_CACHE_EXPIRY_INTERVAL_SECS}.
+ *
+ * and many other properties like {@link ClientProperties}
+ * </pre>
+ * <pre>
+ * This can be used to
+ * - register schema metadata
+ * - add new versions of a schema
+ * - fetch different versions of schema
+ * - fetch latest version of a schema
+ * - check whether the given schema text is compatible with a latest version of the schema
+ * - register serializer/deserializer for a schema
+ * - fetch serializer/deserializer for a schema
+ * </pre>
+ */
+public class SchemaRegistryClient implements ISchemaRegistryClient {
+ private static final Logger LOG = LoggerFactory.getLogger(SchemaRegistryClient.class);
+
+ private static final String SCHEMA_REGISTRY_PATH = "/schemaregistry";
+ private static final String SCHEMAS_PATH = SCHEMA_REGISTRY_PATH + "/schemas/";
+ private static final String SCHEMA_PROVIDERS_PATH = SCHEMA_REGISTRY_PATH + "/schemaproviders/";
+ private static final String SCHEMAS_BY_ID_PATH = SCHEMA_REGISTRY_PATH + "/schemasById/";
+ private static final String SCHEMA_VERSIONS_PATH = SCHEMAS_PATH + "versions/";
+ private static final String FILES_PATH = SCHEMA_REGISTRY_PATH + "/files/";
+ private static final String SERIALIZERS_PATH = SCHEMA_REGISTRY_PATH + "/serdes/";
+ private static final String REGISTY_CLIENT_JAAS_SECTION = "RegistryClient";
+ private static final Set<Class<?>> DESERIALIZER_INTERFACE_CLASSES = Sets.<Class<?>>newHashSet(SnapshotDeserializer.class, PullDeserializer.class, PushDeserializer.class);
+ private static final Set<Class<?>> SERIALIZER_INTERFACE_CLASSES = Sets.<Class<?>>newHashSet(SnapshotSerializer.class, PullSerializer.class);
+ private static final String SEARCH_FIELDS = SCHEMA_REGISTRY_PATH + "/search/schemas/fields";
+ private static final long KERBEROS_SYNCHRONIZATION_TIMEOUT_MS = 180000;
+
+ private static final String SSL_KEY_PASSWORD = "keyPassword";
+ private static final String SSL_KEY_STORE_PATH = "keyStorePath";
+
+ private static final SchemaRegistryVersion CLIENT_VERSION = SchemaRegistryServiceInfo.get().version();
+
+ protected Login login;
+ private final Client client;
+ private final UrlSelector urlSelector;
+ private final Map<String, SchemaRegistryTargets> urlWithTargets;
+
+ protected final Configuration configuration;
+ private final ClassLoaderCache classLoaderCache;
+ private final SchemaVersionInfoCache schemaVersionInfoCache;
+ private final SchemaMetadataCache schemaMetadataCache;
+ private final Cache<SchemaDigestEntry, SchemaIdVersion> schemaTextCache;
+
+ private static final String SSL_CONFIGURATION_KEY = "schema.registry.client.ssl";
+ private static final String HOSTNAME_VERIFIER_CLASS_KEY = "hostnameVerifierClass";
+
+ /**
+ * Creates {@link SchemaRegistryClient} instance with the given yaml config.
+ *
+ * @param confFile config file which contains the configuration entries.
+ *
+ * @throws IOException when any IOException occurs while reading the given confFile
+ */
+ public SchemaRegistryClient(File confFile) throws IOException {
+ this(buildConfFromFile(confFile));
+ }
+
+ private static Map<String, ?> buildConfFromFile(File confFile) throws IOException {
+ try (FileInputStream fis = new FileInputStream(confFile)) {
+ return (Map<String, Object>) new Yaml().load(IOUtils.toString(fis, "UTF-8"));
+ }
+ }
+
+ public SchemaRegistryClient(Map<String, ?> conf) {
+ configuration = new Configuration(conf);
+ initializeSecurityContext();
+ ClientConfig config = createClientConfig(conf);
+ ClientBuilder clientBuilder = JerseyClientBuilder.newBuilder()
+ .withConfig(config)
+ .property(ClientProperties.FOLLOW_REDIRECTS, Boolean.TRUE);
+ if (conf.containsKey(SSL_CONFIGURATION_KEY)) {
+ Map<String, String> sslConfigurations = (Map<String, String>) conf.get(SSL_CONFIGURATION_KEY);
+ clientBuilder.sslContext(createSSLContext(sslConfigurations));
+ if (sslConfigurations.containsKey(HOSTNAME_VERIFIER_CLASS_KEY)) {
+ HostnameVerifier hostNameVerifier = null;
+ String hostNameVerifierClassName = sslConfigurations.get(HOSTNAME_VERIFIER_CLASS_KEY);
+ try {
+ hostNameVerifier = (HostnameVerifier) Class.forName(hostNameVerifierClassName).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to instantiate hostNameVerifierClass : " + hostNameVerifierClassName, e);
+ }
+ clientBuilder.hostnameVerifier(hostNameVerifier);
+ }
+ }
+ client = clientBuilder.build();
+ client.register(MultiPartFeature.class);
+
+ // get list of urls and create given or default UrlSelector.
+ urlSelector = createUrlSelector();
+ urlWithTargets = new ConcurrentHashMap<>();
+
+ classLoaderCache = new ClassLoaderCache(this);
+
+ schemaVersionInfoCache = new SchemaVersionInfoCache(
+ new SchemaVersionRetriever() {
+ @Override
+ public SchemaVersionInfo retrieveSchemaVersion(SchemaVersionKey key) throws SchemaNotFoundException {
+ return doGetSchemaVersionInfo(key);
+ }
+
+ @Override
+ public SchemaVersionInfo retrieveSchemaVersion(SchemaIdVersion key) throws SchemaNotFoundException {
+ return doGetSchemaVersionInfo(key);
+ }
+ },
+ ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_SIZE.name())).intValue(),
+ ((Number) configuration.getValue(Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name())).longValue() * 1000L
+ );
+
+ SchemaMetadataCache.SchemaMetadataFetcher schemaMetadataFetcher = createSchemaMetadataFetcher();
+ schemaMetadataCache = new SchemaMetadataCache(((Number) configuration.getValue(Configuration.SCHEMA_METADATA_CACHE_SIZE
+ .name())).longValue(),
+ ((Number) configuration.getValue(Configuration.SCHEMA_METADATA_CACHE_EXPIRY_INTERVAL_SECS
+ .name())).longValue(),
+ schemaMetadataFetcher);
+
+ schemaTextCache = CacheBuilder.newBuilder()
+ .maximumSize(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_SIZE
+ .name())).longValue())
+ .expireAfterAccess(((Number) configuration.getValue(Configuration.SCHEMA_TEXT_CACHE_EXPIRY_INTERVAL_SECS
+ .name())).longValue(),
+ TimeUnit.SECONDS)
+ .build();
+ }
+
+ protected void initializeSecurityContext() {
+ String saslJaasConfig = configuration.getValue(Configuration.SASL_JAAS_CONFIG.name());
+ if (saslJaasConfig != null) {
+ KerberosLogin kerberosLogin = new KerberosLogin(KERBEROS_SYNCHRONIZATION_TIMEOUT_MS);
+ try {
+ kerberosLogin.configure(new HashMap<>(), REGISTY_CLIENT_JAAS_SECTION, new JaasConfiguration(REGISTY_CLIENT_JAAS_SECTION, saslJaasConfig));
+ kerberosLogin.login();
+ login = kerberosLogin;
+ return;
+ } catch (LoginException e) {
+ LOG.error("Failed to initialize the dynamic JAAS config: " + saslJaasConfig + ". Attempting static JAAS config.");
+ } catch (Exception e) {
+ LOG.error("Failed to parse the dynamic JAAS config. Attempting static JAAS config.", e);
+ }
+ }
+
+ String jaasConfigFile = System.getProperty("java.security.auth.login.config");
+ if (jaasConfigFile != null && !jaasConfigFile.trim().isEmpty()) {
+ KerberosLogin kerberosLogin = new KerberosLogin(KERBEROS_SYNCHRONIZATION_TIMEOUT_MS);
+ kerberosLogin.configure(new HashMap<>(), REGISTY_CLIENT_JAAS_SECTION);
+ try {
+ kerberosLogin.login();
+ login = kerberosLogin;
+ } catch (LoginException e) {
+ LOG.error("Could not login using jaas config section " + REGISTY_CLIENT_JAAS_SECTION);
+ login = new NOOPLogin();
+ }
+ } else {
+ LOG.warn("System property for jaas config file is not defined. Its okay if schema registry is not running in secured mode");
+ login = new NOOPLogin();
+ }
+ }
+
+ protected SSLContext createSSLContext(Map<String, String> sslConfigurations) {
+ SslConfigurator sslConfigurator = SslConfigurator.newInstance();
+ if (sslConfigurations.containsKey(SSL_KEY_STORE_PATH)) {
+ sslConfigurator.keyStoreType(sslConfigurations.get("keyStoreType"))
+ .keyStoreFile(sslConfigurations.get(SSL_KEY_STORE_PATH))
+ .keyStorePassword(sslConfigurations.get("keyStorePassword"))
+ .keyStoreProvider(sslConfigurations.get("keyStoreProvider"))
+ .keyManagerFactoryAlgorithm(sslConfigurations.get("keyManagerFactoryAlgorithm"))
+ .keyManagerFactoryProvider(sslConfigurations.get("keyManagerFactoryProvider"));
+ if (sslConfigurations.containsKey(SSL_KEY_PASSWORD)) {
+ sslConfigurator.keyPassword(sslConfigurations.get(SSL_KEY_PASSWORD));
+ }
+ }
+
+
+ sslConfigurator.trustStoreType(sslConfigurations.get("trustStoreType"))
+ .trustStoreFile(sslConfigurations.get("trustStorePath"))
+ .trustStorePassword(sslConfigurations.get("trustStorePassword"))
+ .trustStoreProvider(sslConfigurations.get("trustStoreProvider"))
+ .trustManagerFactoryAlgorithm(sslConfigurations.get("trustManagerFactoryAlgorithm"))
+ .trustManagerFactoryProvider(sslConfigurations.get("trustManagerFactoryProvider"));
+
+ sslConfigurator.securityProtocol(sslConfigurations.get("protocol"));
+
+ return sslConfigurator.createSSLContext();
+ }
+
+ private SchemaRegistryTargets currentSchemaRegistryTargets() {
+ String url = urlSelector.select();
+ urlWithTargets.computeIfAbsent(url, s -> new SchemaRegistryTargets(client.target(s)));
+ return urlWithTargets.get(url);
+ }
+
+ private static class SchemaRegistryTargets {
+ private final WebTarget schemaProvidersTarget;
+ private final WebTarget schemasTarget;
+ private final WebTarget schemasByIdTarget;
+ private final WebTarget rootTarget;
+ private final WebTarget searchFieldsTarget;
+ private final WebTarget serializersTarget;
+ private final WebTarget filesTarget;
+ private final WebTarget schemaVersionsTarget;
+ private final WebTarget schemaVersionsByIdTarget;
+ private final WebTarget schemaVersionsStatesMachineTarget;
+
+ SchemaRegistryTargets(WebTarget rootTarget) {
+ this.rootTarget = rootTarget;
+ schemaProvidersTarget = rootTarget.path(SCHEMA_PROVIDERS_PATH);
+ schemasTarget = rootTarget.path(SCHEMAS_PATH);
+ schemasByIdTarget = rootTarget.path(SCHEMAS_BY_ID_PATH);
+ schemaVersionsByIdTarget = schemasTarget.path("versionsById");
+ schemaVersionsTarget = rootTarget.path(SCHEMA_VERSIONS_PATH);
+ schemaVersionsStatesMachineTarget = schemaVersionsTarget.path("statemachine");
+ searchFieldsTarget = rootTarget.path(SEARCH_FIELDS);
+ serializersTarget = rootTarget.path(SERIALIZERS_PATH);
+ filesTarget = rootTarget.path(FILES_PATH);
+ }
+
+ }
+
+ private UrlSelector createUrlSelector() {
+ UrlSelector urlSelector = null;
+ String rootCatalogURL = configuration.getValue(SCHEMA_REGISTRY_URL.name());
+ String urlSelectorClass = configuration.getValue(Configuration.URL_SELECTOR_CLASS.name());
+ if (urlSelectorClass == null) {
+ urlSelector = new LoadBalancedFailoverUrlSelector(rootCatalogURL);
+ } else {
+ try {
+ urlSelector = (UrlSelector) Class.forName(urlSelectorClass)
+ .getConstructor(String.class)
+ .newInstance(rootCatalogURL);
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException
+ | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ urlSelector.init(configuration.getConfig());
+
+ return urlSelector;
+ }
+
+ private SchemaMetadataCache.SchemaMetadataFetcher createSchemaMetadataFetcher() {
+ return new SchemaMetadataCache.SchemaMetadataFetcher() {
+ @Override
+ public SchemaMetadataInfo fetch(String name) throws SchemaNotFoundException {
+ try {
+ return getEntity(currentSchemaRegistryTargets().schemasTarget.path(name), SchemaMetadataInfo.class);
+ } catch (NotFoundException e) {
+ throw new SchemaNotFoundException(e);
+ }
+ }
+
+ @Override
+ public SchemaMetadataInfo fetch(Long id) throws SchemaNotFoundException {
+ try {
+ return getEntity(currentSchemaRegistryTargets().schemasByIdTarget.path(id.toString()), SchemaMetadataInfo.class);
+ } catch (NotFoundException e) {
+ throw new SchemaNotFoundException(e);
+ }
+ }
+ };
+ }
+
+ protected ClientConfig createClientConfig(Map<String, ?> conf) {
+ ClientConfig config = new ClientConfig();
+ config.property(ClientProperties.CONNECT_TIMEOUT, DEFAULT_CONNECTION_TIMEOUT);
+ config.property(ClientProperties.READ_TIMEOUT, DEFAULT_READ_TIMEOUT);
+ config.property(ClientProperties.FOLLOW_REDIRECTS, true);
+ for (Map.Entry<String, ?> entry : conf.entrySet()) {
+ config.property(entry.getKey(), entry.getValue());
+ }
+ return config;
+ }
+
+ public Configuration getConfiguration() {
+ return configuration;
+ }
+
+ @Override
+ public Collection<SchemaProviderInfo> getSupportedSchemaProviders() {
+ return getEntities(currentSchemaRegistryTargets().schemaProvidersTarget, SchemaProviderInfo.class);
+ }
+
+ @Override
+ public Long registerSchemaMetadata(SchemaMetadata schemaMetadata) {
+ return addSchemaMetadata(schemaMetadata);
+ }
+
+ @Override
+ public Long addSchemaMetadata(SchemaMetadata schemaMetadata) {
+ SchemaMetadataInfo schemaMetadataInfo = schemaMetadataCache.getIfPresent(SchemaMetadataCache.Key.of(schemaMetadata
+ .getName()));
+ if (schemaMetadataInfo == null) {
+ return doRegisterSchemaMetadata(schemaMetadata, currentSchemaRegistryTargets().schemasTarget);
+ }
+
+ return schemaMetadataInfo.getId();
+ }
+
+ @Override
+ public SchemaMetadataInfo updateSchemaMetadata(String schemaName, SchemaMetadata schemaMetadata) {
+ SchemaMetadataInfo schemaMetadataInfo = postEntity(currentSchemaRegistryTargets().schemasTarget.path(schemaName), schemaMetadata, SchemaMetadataInfo.class);
+ if (schemaMetadataInfo != null) {
+ schemaMetadataCache.put(SchemaMetadataCache.Key.of(schemaName), schemaMetadataInfo);
+ }
+ return schemaMetadataInfo;
+ }
+
+
+ private Long doRegisterSchemaMetadata(SchemaMetadata schemaMetadata, WebTarget schemasTarget) {
+ try {
+ return postEntity(schemasTarget, schemaMetadata, Long.class);
+ } catch(BadRequestException ex) {
+ Response response = ex.getResponse();
+ CatalogResponse catalogResponse = SchemaRegistryClient.readCatalogResponse(response.readEntity(String.class));
+ if(catalogResponse.getResponseCode() == CatalogResponse.ResponseMessage.ENTITY_CONFLICT.getCode()) {
+ return getSchemaMetadataInfo(schemaMetadata.getName()).getId();
+ } else {
+ throw ex;
+ }
+ }
+ }
+
+ @Override
+ public SchemaMetadataInfo getSchemaMetadataInfo(String schemaName) {
+ return schemaMetadataCache.get(SchemaMetadataCache.Key.of(schemaName));
+ }
+
+ @Override
+ public SchemaMetadataInfo getSchemaMetadataInfo(Long schemaMetadataId) {
+ return schemaMetadataCache.get(SchemaMetadataCache.Key.of(schemaMetadataId));
+ }
+
+ @Override
+ public void deleteSchema(String schemaName) throws SchemaNotFoundException {
+ Collection<SchemaVersionInfo> schemaVersionInfos = getAllVersions(schemaName);
+ schemaMetadataCache.invalidateSchemaMetadata(SchemaMetadataCache.Key.of(schemaName));
+ if (schemaVersionInfos != null) {
+ for (SchemaVersionInfo schemaVersionInfo: schemaVersionInfos) {
+ SchemaIdVersion schemaIdVersion = new SchemaIdVersion(schemaVersionInfo.getId());
+ schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(schemaIdVersion));
+ }
+ }
+
+ WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s", schemaName));
+ Response response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<Response>() {
+ @Override
+ public Response run() {
+ return target.request(MediaType.APPLICATION_JSON_TYPE).delete(Response.class);
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+
+ int status = response.getStatus();
+ if (status == Response.Status.NOT_FOUND.getStatusCode()) {
+ throw new SchemaNotFoundException(response.readEntity(String.class));
+ } else if (status == Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) {
+ throw new RuntimeException(response.readEntity(String.class));
+ }
+ }
+
+ @Override
+ public SchemaIdVersion addSchemaVersion(SchemaMetadata schemaMetadata, SchemaVersion schemaVersion, boolean disableCanonicalCheck) throws
+ InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
+ return addSchemaVersion(SchemaBranch.MASTER_BRANCH, schemaMetadata, schemaVersion, disableCanonicalCheck);
+ }
+
+ @Override
+ public SchemaIdVersion addSchemaVersion(String schemaBranchName, SchemaMetadata schemaMetadata, SchemaVersion schemaVersion, boolean disableCanonicalCheck) throws
+ InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
+ // get it, if it exists in cache
+ SchemaDigestEntry schemaDigestEntry = buildSchemaTextEntry(schemaVersion, schemaMetadata.getName());
+ SchemaIdVersion schemaIdVersion = schemaTextCache.getIfPresent(schemaDigestEntry);
+
+ if (schemaIdVersion == null) {
+ //register schema metadata if it does not exist
+ Long metadataId = registerSchemaMetadata(schemaMetadata);
+ if (metadataId == null) {
+ LOG.error("Schema Metadata [{}] is not registered successfully", schemaMetadata);
+ throw new RuntimeException("Given SchemaMetadata could not be registered: " + schemaMetadata);
+ }
+
+ // add schemaIdVersion
+ schemaIdVersion = addSchemaVersion(schemaBranchName, schemaMetadata.getName(), schemaVersion, disableCanonicalCheck);
+ }
+
+ return schemaIdVersion;
+ }
+
+ @Override
+ public SchemaIdVersion uploadSchemaVersion(String schemaName, String description, InputStream schemaVersionTextFile) throws
+ InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
+ return uploadSchemaVersion(SchemaBranch.MASTER_BRANCH, schemaName, description, schemaVersionTextFile);
+ }
+
+ public SchemaIdVersion uploadSchemaVersion(final String schemaBranchName,
+ final String schemaName,
+ final String description,
+ final InputStream schemaVersionInputStream)
+ throws InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
+
+ SchemaMetadataInfo schemaMetadataInfo = getSchemaMetadataInfo(schemaName);
+ if (schemaMetadataInfo == null) {
+ throw new SchemaNotFoundException("Schema with name " + schemaName + " not found");
+ }
+
+ StreamDataBodyPart streamDataBodyPart = new StreamDataBodyPart("file", schemaVersionInputStream);
+
+ WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(schemaName).path("/versions/upload").queryParam("branch",schemaBranchName);
+ MultiPart multipartEntity =
+ new FormDataMultiPart()
+ .field("description", description, MediaType.APPLICATION_JSON_TYPE)
+ .bodyPart(streamDataBodyPart);
+
+ Entity<MultiPart> multiPartEntity = Entity.entity(multipartEntity, MediaType.MULTIPART_FORM_DATA);
+ Response response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<Response>() {
+ @Override
+ public Response run() {
+ return target.request().post(multiPartEntity, Response.class);
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+ return handleSchemaIdVersionResponse(schemaMetadataInfo, response);
+ }
+
+ private SchemaDigestEntry buildSchemaTextEntry(SchemaVersion schemaVersion, String name) {
+ byte[] digest;
+ try {
+ digest = MessageDigest.getInstance("MD5").digest(schemaVersion.getSchemaText().getBytes("UTF-8"));
+ } catch (NoSuchAlgorithmException | UnsupportedEncodingException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+
+ // storing schema text string is expensive, so storing digest in cache's key.
+ return new SchemaDigestEntry(name, digest);
+ }
+
+ @Override
+ public SchemaIdVersion addSchemaVersion(final String schemaName, final SchemaVersion schemaVersion, boolean disableCanonicalCheck)
+ throws InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
+ return addSchemaVersion(SchemaBranch.MASTER_BRANCH, schemaName, schemaVersion, disableCanonicalCheck);
+ }
+
+ @Override
+ public SchemaIdVersion addSchemaVersion(final String schemaBranchName, final String schemaName, final SchemaVersion schemaVersion, boolean disableCanonicalCheck)
+ throws InvalidSchemaException, IncompatibleSchemaException, SchemaNotFoundException, SchemaBranchNotFoundException {
+
+ try {
+ return schemaTextCache.get(buildSchemaTextEntry(schemaVersion, schemaName),
+ () -> doAddSchemaVersion(schemaBranchName, schemaName, schemaVersion, disableCanonicalCheck));
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ LOG.error("Encountered error while adding new version [{}] of schema [{}] and error [{}]", schemaVersion, schemaName, e);
+ if (cause != null) {
+ if (cause instanceof InvalidSchemaException)
+ throw (InvalidSchemaException) cause;
+ else if (cause instanceof IncompatibleSchemaException) {
+ throw (IncompatibleSchemaException) cause;
+ } else if (cause instanceof SchemaNotFoundException) {
+ throw (SchemaNotFoundException) cause;
+ } else {
+ throw new RuntimeException(cause.getMessage(), cause);
+ }
+ } else {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void deleteSchemaVersion(SchemaVersionKey schemaVersionKey) throws SchemaNotFoundException, SchemaLifecycleException {
+ schemaVersionInfoCache.invalidateSchema(new SchemaVersionInfoCache.Key(schemaVersionKey));
+
+ WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s/versions/%s", schemaVersionKey
+ .getSchemaName(), schemaVersionKey.getVersion()));
+ Response response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<Response>() {
+ @Override
+ public Response run() {
+ return target.request(MediaType.APPLICATION_JSON_TYPE).delete(Response.class);
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+
+ handleDeleteSchemaResponse(response);
+ }
+
+ private void handleDeleteSchemaResponse(Response response) throws SchemaNotFoundException, SchemaLifecycleException {
+ String msg = response.readEntity(String.class);
+ switch (Response.Status.fromStatusCode(response.getStatus())) {
+ case NOT_FOUND:
+ throw new SchemaNotFoundException(msg);
+ case BAD_REQUEST:
+ throw new SchemaLifecycleException(msg);
+ case INTERNAL_SERVER_ERROR:
+ throw new RuntimeException(msg);
+ }
+ }
+
+ private SchemaIdVersion doAddSchemaVersion(String schemaBranchName, String schemaName,
+ SchemaVersion schemaVersion, boolean disableCanonicalCheck) throws IncompatibleSchemaException, InvalidSchemaException, SchemaNotFoundException {
+ SchemaMetadataInfo schemaMetadataInfo = getSchemaMetadataInfo(schemaName);
+ if (schemaMetadataInfo == null) {
+ throw new SchemaNotFoundException("Schema with name " + schemaName + " not found");
+ }
+
+ WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(schemaName).path("/versions").queryParam("branch", schemaBranchName)
+ .queryParam("disableCanonicalCheck", disableCanonicalCheck);
+ Response response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<Response>() {
+ @Override
+ public Response run() {
+ return target.request(MediaType.APPLICATION_JSON_TYPE).post(Entity.json(schemaVersion), Response.class);
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+ return handleSchemaIdVersionResponse(schemaMetadataInfo, response);
+ }
+
+ private SchemaIdVersion handleSchemaIdVersionResponse(SchemaMetadataInfo schemaMetadataInfo,
+ Response response) throws IncompatibleSchemaException, InvalidSchemaException {
+ int status = response.getStatus();
+ String msg = response.readEntity(String.class);
+ if (status == Response.Status.BAD_REQUEST.getStatusCode() || status == Response.Status.INTERNAL_SERVER_ERROR.getStatusCode()) {
+ CatalogResponse catalogResponse = readCatalogResponse(msg);
+ if (CatalogResponse.ResponseMessage.INCOMPATIBLE_SCHEMA.getCode() == catalogResponse.getResponseCode()) {
+ throw new IncompatibleSchemaException(catalogResponse.getResponseMessage());
+ } else if (CatalogResponse.ResponseMessage.INVALID_SCHEMA.getCode() == catalogResponse.getResponseCode()) {
+ throw new InvalidSchemaException(catalogResponse.getResponseMessage());
+ } else {
+ throw new RuntimeException(catalogResponse.getResponseMessage());
+ }
+
+ }
+
+ Integer version = readEntity(msg, Integer.class);
+
+ SchemaVersionInfo schemaVersionInfo = doGetSchemaVersionInfo(new SchemaVersionKey(schemaMetadataInfo.getSchemaMetadata()
+ .getName(), version));
+
+ return new SchemaIdVersion(schemaMetadataInfo.getId(), version, schemaVersionInfo.getId());
+ }
+
+ public static CatalogResponse readCatalogResponse(String msg) {
+ ObjectMapper objectMapper = new ObjectMapper();
+ try {
+ JsonNode node = objectMapper.readTree(msg);
+
+ return objectMapper.treeToValue(node, CatalogResponse.class);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public SchemaVersionInfo getSchemaVersionInfo(SchemaIdVersion schemaIdVersion) throws SchemaNotFoundException {
+ try {
+ return schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(schemaIdVersion));
+ } catch (SchemaNotFoundException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public SchemaVersionInfo getLatestSchemaVersionInfo(String schemaName) throws SchemaNotFoundException {
+ return getLatestSchemaVersionInfo(SchemaBranch.MASTER_BRANCH, schemaName);
+ }
+
+ @Override
+ public SchemaVersionInfo getSchemaVersionInfo(SchemaVersionKey schemaVersionKey) throws SchemaNotFoundException {
+ try {
+ return schemaVersionInfoCache.getSchema(SchemaVersionInfoCache.Key.of(schemaVersionKey));
+ } catch (SchemaNotFoundException ex) {
+ throw ex;
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private SchemaVersionInfo doGetSchemaVersionInfo(SchemaIdVersion schemaIdVersion) throws SchemaNotFoundException {
+ if (schemaIdVersion.getSchemaVersionId() != null) {
+ LOG.info("Getting schema version from target registry for [{}]", schemaIdVersion.getSchemaVersionId());
+ return getEntity(currentSchemaRegistryTargets()
+ .schemaVersionsByIdTarget
+ .path(schemaIdVersion.getSchemaVersionId().toString()),
+ SchemaVersionInfo.class);
+ } else if (schemaIdVersion.getSchemaMetadataId() != null) {
+ SchemaMetadataInfo schemaMetadataInfo = getSchemaMetadataInfo(schemaIdVersion.getSchemaMetadataId());
+ SchemaVersionKey schemaVersionKey = new SchemaVersionKey(schemaMetadataInfo.getSchemaMetadata()
+ .getName(), schemaIdVersion.getVersion());
+ LOG.info("Getting schema version from target registry for key [{}]", schemaVersionKey);
+ return doGetSchemaVersionInfo(schemaVersionKey);
+ }
+
+ throw new IllegalArgumentException("Given argument not valid: " + schemaIdVersion);
+ }
+
+ private SchemaVersionInfo doGetSchemaVersionInfo(SchemaVersionKey schemaVersionKey) {
+ LOG.info("Getting schema version from target registry for [{}]", schemaVersionKey);
+ String schemaName = schemaVersionKey.getSchemaName();
+ WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(String.format("%s/versions/%d", schemaName, schemaVersionKey
+ .getVersion()));
+
+ return getEntity(webTarget, SchemaVersionInfo.class);
+ }
+
+ @Override
+ public SchemaVersionInfo getLatestSchemaVersionInfo(String schemaBranchName, String schemaName) throws SchemaNotFoundException {
+ WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/versions/latest").queryParam("branch", schemaBranchName);;
+ return getEntity(webTarget, SchemaVersionInfo.class);
+ }
+
+ @Override
+ public Collection<SchemaVersionInfo> getAllVersions(String schemaName) throws SchemaNotFoundException {
+ return getAllVersions(SchemaBranch.MASTER_BRANCH, schemaName);
+ }
+
+ private static String encode(String schemaName) {
+ try {
+ return URLEncoder.encode(schemaName, "UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void enableSchemaVersion(Long schemaVersionId)
+ throws SchemaNotFoundException, SchemaLifecycleException, IncompatibleSchemaException {
+ try {
+ transitionSchemaVersionState(schemaVersionId, "enable", null);
+ } catch (SchemaLifecycleException e) {
+ Throwable cause = e.getCause();
+ if (cause != null && cause instanceof IncompatibleSchemaException) {
+ throw (IncompatibleSchemaException) cause;
+ }
+ throw e;
+ }
+ }
+
+ @Override
+ public void disableSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
+ transitionSchemaVersionState(schemaVersionId, "disable", null);
+ }
+
+ @Override
+ public void deleteSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
+ transitionSchemaVersionState(schemaVersionId, "delete", null);
+ }
+
+ @Override
+ public void archiveSchemaVersion(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
+ transitionSchemaVersionState(schemaVersionId, "archive", null);
+ }
+
+ @Override
+ public void startSchemaVersionReview(Long schemaVersionId) throws SchemaNotFoundException, SchemaLifecycleException {
+ transitionSchemaVersionState(schemaVersionId, "startReview", null);
+ }
+
+ @Override
+ public SchemaVersionMergeResult mergeSchemaVersion(Long schemaVersionId, boolean disableCanonicalCheck) throws SchemaNotFoundException, IncompatibleSchemaException {
+ WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(schemaVersionId + "/merge").queryParam("disableCanonicalCheck", disableCanonicalCheck);
+ Response response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<Response>() {
+ @Override
+ public Response run() {
+ return target.request().post(null);
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+
+ int status = response.getStatus();
+ if (status == Response.Status.OK.getStatusCode()) {
+ String msg = response.readEntity(String.class);
+ return readEntity(msg, SchemaVersionMergeResult.class);
+ } else if (status == Response.Status.NOT_FOUND.getStatusCode()) {
+ throw new SchemaNotFoundException(response.readEntity(String.class));
+ } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) {
+ throw new IncompatibleSchemaException(response.readEntity(String.class));
+ } else {
+ throw new RuntimeException(response.readEntity(String.class));
+ }
+ }
+
+ @Override
+ public void transitionState(Long schemaVersionId,
+ Byte targetStateId,
+ byte[] transitionDetails) throws SchemaNotFoundException, SchemaLifecycleException {
+ boolean result = transitionSchemaVersionState(schemaVersionId, targetStateId.toString(), transitionDetails);
+ }
+
+ @Override
+ public SchemaVersionLifecycleStateMachineInfo getSchemaVersionLifecycleStateMachineInfo() {
+ return getEntity(currentSchemaRegistryTargets().schemaVersionsStatesMachineTarget,
+ SchemaVersionLifecycleStateMachineInfo.class);
+ }
+
+ @Override
+ public SchemaBranch createSchemaBranch(Long schemaVersionId, SchemaBranch schemaBranch) throws SchemaBranchAlreadyExistsException, SchemaNotFoundException {
+ WebTarget target = currentSchemaRegistryTargets().schemasTarget.path("versionsById/" + schemaVersionId + "/branch");
+ Response response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<Response>() {
+ @Override
+ public Response run() {
+ return target.request(MediaType.APPLICATION_JSON_TYPE).post(Entity.json(schemaBranch), Response.class);
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+
+ int status = response.getStatus();
+ if (status == Response.Status.OK.getStatusCode()) {
+ String msg = response.readEntity(String.class);
+ SchemaBranch returnedSchemaBranch = readEntity(msg, SchemaBranch.class);
+ return returnedSchemaBranch;
+ } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) {
+ throw new SchemaNotFoundException(response.readEntity(String.class));
+ } else if (status == Response.Status.CONFLICT.getStatusCode()) {
+ throw new SchemaBranchAlreadyExistsException(response.readEntity(String.class));
+ } else {
+ throw new RuntimeException(response.readEntity(String.class));
+ }
+ }
+
+ @Override
+ public Collection<SchemaBranch> getSchemaBranches(String schemaName) throws SchemaNotFoundException {
+ WebTarget target = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/branches");
+ Response response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<Response>() {
+ @Override
+ public Response run() {
+ return target.request().get();
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+
+ int status = response.getStatus();
+ if (status == Response.Status.NOT_FOUND.getStatusCode()) {
+ throw new SchemaNotFoundException(response.readEntity(String.class));
+ } else if (status != Response.Status.OK.getStatusCode()) {
+ throw new RuntimeException(response.readEntity(String.class));
+ }
+
+ return parseResponseAsEntities(response.readEntity(String.class), SchemaBranch.class);
+ }
+
+ @Override
+ public void deleteSchemaBranch(Long schemaBranchId) throws SchemaBranchNotFoundException, InvalidSchemaBranchDeletionException {
+ WebTarget target = currentSchemaRegistryTargets().schemasTarget.path("branch/" + schemaBranchId);
+ Response response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<Response>() {
+ @Override
+ public Response run() {
+ return target.request().delete();
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+
+ int status = response.getStatus();
+ if (status == Response.Status.NOT_FOUND.getStatusCode()) {
+ throw new SchemaBranchNotFoundException(response.readEntity(String.class));
+ } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) {
+ throw new InvalidSchemaBranchDeletionException(response.readEntity(String.class));
+ } else if (status != Response.Status.OK.getStatusCode()) {
+ throw new RuntimeException(response.readEntity(String.class));
+ }
+
+ }
+
+ @Override
+ public Collection<SchemaVersionInfo> getAllVersions(String schemaBranchName, String schemaName, List<Byte> stateIds) throws SchemaNotFoundException, SchemaBranchNotFoundException {
+ WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/versions").queryParam("branch", schemaBranchName).queryParam("states", stateIds.toArray());
+ return getEntities(webTarget, SchemaVersionInfo.class);
+ }
+
+ private boolean transitionSchemaVersionState(Long schemaVersionId,
+ String operationOrTargetState,
+ byte[] transitionDetails) throws SchemaNotFoundException, SchemaLifecycleException {
+
+ WebTarget webTarget = currentSchemaRegistryTargets().schemaVersionsTarget.path(schemaVersionId + "/state/" + operationOrTargetState);
+ Response response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<Response>() {
+ @Override
+ public Response run() {
+ return webTarget.request().post(Entity.text(transitionDetails));
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+
+ boolean result = handleSchemaLifeCycleResponse(response);
+
+ // invalidate this entry from cache.
+ schemaVersionInfoCache.invalidateSchema(SchemaVersionInfoCache.Key.of(new SchemaIdVersion(schemaVersionId)));
+
+ return result;
+ }
+
+ private boolean handleSchemaLifeCycleResponse(Response response) throws SchemaNotFoundException, SchemaLifecycleException {
+ boolean result;
+ int status = response.getStatus();
+ if (status == Response.Status.OK.getStatusCode()) {
+ result = response.readEntity(Boolean.class);
+ } else if (status == Response.Status.NOT_FOUND.getStatusCode()) {
+ throw new SchemaNotFoundException(response.readEntity(String.class));
+ } else if (status == Response.Status.BAD_REQUEST.getStatusCode()) {
+ CatalogResponse catalogResponse = readCatalogResponse(response.readEntity(String.class));
+ if (catalogResponse.getResponseCode() == CatalogResponse.ResponseMessage.INCOMPATIBLE_SCHEMA.getCode()) {
+ throw new SchemaLifecycleException(new IncompatibleSchemaException(catalogResponse.getResponseMessage()));
+ }
+ throw new SchemaLifecycleException(catalogResponse.getResponseMessage());
+
+ } else {
+ throw new RuntimeException(response.readEntity(String.class));
+ }
+
+ return result;
+ }
+
+ @Override
+ public Collection<SchemaVersionInfo> getAllVersions(String schemaBranchName, String schemaName) throws SchemaNotFoundException {
+ WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/versions").queryParam("branch", schemaBranchName);
+ return getEntities(webTarget, SchemaVersionInfo.class);
+ }
+
+ @Override
+ public CompatibilityResult checkCompatibility(String schemaName, String toSchemaText) throws SchemaNotFoundException, SchemaBranchNotFoundException {
+ return checkCompatibility(SchemaBranch.MASTER_BRANCH, schemaName, toSchemaText);
+ }
+
+ @Override
+ public CompatibilityResult checkCompatibility(String schemaBranchName, String schemaName,
+ String toSchemaText) throws SchemaNotFoundException {
+ WebTarget webTarget = currentSchemaRegistryTargets().schemasTarget.path(encode(schemaName) + "/compatibility").queryParam("branch", schemaBranchName);
+ String response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<String>() {
+ @Override
+ public String run() {
+ return webTarget.request().post(Entity.text(toSchemaText), String.class);
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+ return readEntity(response, CompatibilityResult.class);
+ }
+
+ @Override
+ public boolean isCompatibleWithAllVersions(String schemaName, String toSchemaText) throws SchemaNotFoundException, SchemaBranchNotFoundException {
+ return isCompatibleWithAllVersions(SchemaBranch.MASTER_BRANCH, schemaName, toSchemaText);
+ }
+
+ @Override
+ public boolean isCompatibleWithAllVersions(String schemaBranchName, String schemaName, String toSchemaText) throws SchemaNotFoundException, SchemaBranchNotFoundException {
+ return checkCompatibility(schemaBranchName, schemaName, toSchemaText).isCompatible();
+ }
+
+ @Override
+ public Collection<SchemaVersionKey> findSchemasByFields(SchemaFieldQuery schemaFieldQuery) {
+ WebTarget target = currentSchemaRegistryTargets().searchFieldsTarget;
+ for (Map.Entry<String, String> entry : schemaFieldQuery.toQueryMap().entrySet()) {
+ target = target.queryParam(entry.getKey(), entry.getValue());
+ }
+
+ return getEntities(target, SchemaVersionKey.class);
+ }
+
+ @Override
+ public String uploadFile(InputStream inputStream) {
+ MultiPart multiPart = new MultiPart();
+ BodyPart filePart = new StreamDataBodyPart("file", inputStream, "file");
+ multiPart.bodyPart(filePart);
+ try {
+ return login.doAction(new PrivilegedAction<String>() {
+ @Override
+ public String run() {
+ return currentSchemaRegistryTargets().filesTarget.request()
+ .post(Entity.entity(multiPart, MediaType.MULTIPART_FORM_DATA), String.class);
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+ }
+
+ @Override
+ public InputStream downloadFile(String fileId) {
+ try {
+ return login.doAction(new PrivilegedAction<InputStream>() {
+ @Override
+ public InputStream run() {
+ return currentSchemaRegistryTargets().filesTarget.path("download/" + encode(fileId))
+ .request()
+ .get(InputStream.class);
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+ }
+
+ @Override
+ public Long addSerDes(SerDesPair serDesPair) {
+ return postEntity(currentSchemaRegistryTargets().serializersTarget, serDesPair, Long.class);
+ }
+
+ @Override
+ public void mapSchemaWithSerDes(String schemaName, Long serDesId) {
+ String path = String.format("%s/mapping/%s", encode(schemaName), serDesId.toString());
+
+ Boolean success = postEntity(currentSchemaRegistryTargets().schemasTarget.path(path), null, Boolean.class);
+ LOG.info("Received response while mapping schema [{}] with serialzer/deserializer [{}] : [{}]", schemaName, serDesId, success);
+ }
+
+ @Override
+ public <T> T getDefaultSerializer(String type) throws SerDesException {
+ Collection<SchemaProviderInfo> supportedSchemaProviders = getSupportedSchemaProviders();
+ for (SchemaProviderInfo schemaProvider : supportedSchemaProviders) {
+ if (schemaProvider.getType().equals(type)) {
+ try {
+ return (T) Class.forName(schemaProvider.getDefaultSerializerClassName()).newInstance();
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new SerDesException(e);
+ }
+ }
+ }
+
+ throw new IllegalArgumentException("No schema provider registered for the given type " + type);
+ }
+
+ @Override
+ public <T> T getDefaultDeserializer(String type) throws SerDesException {
+ Collection<SchemaProviderInfo> supportedSchemaProviders = getSupportedSchemaProviders();
+ for (SchemaProviderInfo schemaProvider : supportedSchemaProviders) {
+ if (schemaProvider.getType().equals(type)) {
+ try {
+ return (T) Class.forName(schemaProvider.getDefaultDeserializerClassName()).newInstance();
+ } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
+ throw new SerDesException(e);
+ }
+ }
+ }
+
+ throw new IllegalArgumentException("No schema provider registered for the given type " + type);
+ }
+
+ @Override
+ public Collection<SerDesInfo> getSerDes(String schemaName) {
+ String path = encode(schemaName) + "/serdes/";
+ return getEntities(currentSchemaRegistryTargets().schemasTarget.path(path), SerDesInfo.class);
+ }
+
+ public <T> T createSerializerInstance(SerDesInfo serDesInfo) {
+ return createInstance(serDesInfo, true);
+ }
+
+ @Override
+ public <T> T createDeserializerInstance(SerDesInfo serDesInfo) {
+ return createInstance(serDesInfo, false);
+ }
+
+ @Override
+ public void close() {
+ client.close();
+ }
+
+ public SchemaRegistryVersion clientVersion() {
+ return CLIENT_VERSION;
+ }
+
+ private <T> T createInstance(SerDesInfo serDesInfo, boolean isSerializer) {
+ Set<Class<?>> interfaceClasses = isSerializer ? SERIALIZER_INTERFACE_CLASSES : DESERIALIZER_INTERFACE_CLASSES;
+
+ if (interfaceClasses == null || interfaceClasses.isEmpty()) {
+ throw new IllegalArgumentException("interfaceClasses array must be neither null nor empty.");
+ }
+
+ // loading serializer, create a class loader and and keep them in cache.
+ final SerDesPair serDesPair = serDesInfo.getSerDesPair();
+ String fileId = serDesPair.getFileId();
+ // get class loader for this file ID
+ ClassLoader classLoader = classLoaderCache.getClassLoader(fileId);
+
+ T t;
+ try {
+ String className =
+ isSerializer ? serDesPair.getSerializerClassName() : serDesPair.getDeserializerClassName();
+
+ Class<T> clazz = (Class<T>) Class.forName(className, true, classLoader);
+ t = clazz.newInstance();
+ List<Class<?>> classes = new ArrayList<>();
+ for (Class<?> interfaceClass : interfaceClasses) {
+ if (interfaceClass.isAssignableFrom(clazz)) {
+ classes.add(interfaceClass);
+ }
+ }
+
+ if (classes.isEmpty()) {
+ throw new RuntimeException("Given Serialize/Deserializer " + className + " class does not implement any " +
+ "one of the registered interfaces: " + interfaceClasses);
+ }
+
+ Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
+ classes.toArray(new Class[classes.size()]),
+ new ClassLoaderAwareInvocationHandler(classLoader, t));
+ } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+ throw new SerDesException(e);
+ }
+
+ return t;
+ }
+
+ private <T> List<T> getEntities(WebTarget target, Class<T> clazz) {
+ String response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<String>() {
+ @Override
+ public String run() {
+ return target.request(MediaType.APPLICATION_JSON_TYPE).get(String.class);
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+ return parseResponseAsEntities(response, clazz);
+ }
+
+ private <T> List<T> parseResponseAsEntities(String response, Class<T> clazz) {
+ List<T> entities = new ArrayList<>();
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ JsonNode node = mapper.readTree(response);
+ Iterator<JsonNode> it = node.get("entities").elements();
+ while (it.hasNext()) {
+ entities.add(mapper.treeToValue(it.next(), clazz));
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ return entities;
+ }
+
+ private <T> T postEntity(WebTarget target, Object json, Class<T> responseType) {
+ String response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<String>() {
+ @Override
+ public String run() {
+ return target.request(MediaType.APPLICATION_JSON_TYPE).post(Entity.json(json), String.class);
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+ return readEntity(response, responseType);
+ }
+
+ private <T> T readEntity(String response, Class<T> clazz) {
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(response, clazz);
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ private <T> T getEntity(WebTarget target, Class<T> clazz) {
+ String response = null;
+ try {
+ response = login.doAction(new PrivilegedAction<String>() {
+ @Override
+ public String run() {
+ return target.request(MediaType.APPLICATION_JSON_TYPE).get(String.class);
+ }
+ });
+ } catch (LoginException e) {
+ throw new RegistryRetryableException(e);
+ }
+
+ return readEntity(response, clazz);
+ }
+
+ public static final class Configuration {
+ // we may want to remove schema.registry prefix from configuration properties as these are all properties
+ // given by client.
+ /**
+ * URL of schema registry to which this client connects to. For ex: http://localhost:9090/api/v1
+ */
+ public static final ConfigEntry<String> SCHEMA_REGISTRY_URL =
+ ConfigEntry.mandatory("schema.registry.url",
+ String.class,
+ "URL of schema registry to which this client connects to. For ex: http://localhost:9090/api/v1",
+ "http://localhost:9090/api/v1",
+ ConfigEntry.NonEmptyStringValidator.get());
+
+ /**
+ * Default path for downloaded jars to be stored.
+ */
+ public static final String DEFAULT_LOCAL_JARS_PATH = "/tmp/schema-registry/local-jars";
+
+ /**
+ * Local directory path to which downloaded jars should be copied to. For ex: /tmp/schema-registry/local-jars
+ */
+ public static final ConfigEntry<String> LOCAL_JAR_PATH =
+ ConfigEntry.optional("schema.registry.client.local.jars.path",
+ String.class,
+ "URL of schema registry to which this client connects to. For ex: http://localhost:9090/api/v1",
+ DEFAULT_LOCAL_JARS_PATH,
+ ConfigEntry.NonEmptyStringValidator.get());
+
+ /**
+ * Default value for classloader cache size.
+ */
+ public static final long DEFAULT_CLASSLOADER_CACHE_SIZE = 1024L;
+
+ /**
+ * Default value for cache expiry interval in seconds.
+ */
+ public static final long DEFAULT_CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS = 60 * 60L;
+
+ /**
+ * Maximum size of classloader cache. Default value is {@link #DEFAULT_CLASSLOADER_CACHE_SIZE}
+ * Classloaders are created for serializer/deserializer jars downloaded from schema registry and they will be locally cached.
+ */
+ public static final ConfigEntry<Number> CLASSLOADER_CACHE_SIZE =
+ ConfigEntry.optional("schema.registry.client.class.loader.cache.size",
+ Integer.class,
+ "Maximum size of classloader cache",
+ DEFAULT_CLASSLOADER_CACHE_SIZE,
+ ConfigEntry.PositiveNumberValidator.get());
+
+ /**
+ * Expiry interval(in seconds) of an entry in classloader cache. Default value is {@link #DEFAULT_CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS}
+ * Classloaders are created for serializer/deserializer jars downloaded from schema registry and they will be locally cached.
+ */
+ public static final ConfigEntry<Number> CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS =
+ ConfigEntry.optional("schema.registry.client.class.loader.cache.expiry.interval.secs",
+ Integer.class,
+ "Expiry interval(in seconds) of an entry in classloader cache",
+ DEFAULT_CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS,
+ ConfigEntry.PositiveNumberValidator.get());
+
+ public static final long DEFAULT_SCHEMA_CACHE_SIZE = 1024;
+ public static final long DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS = 5 * 60L;
+
+ /**
+ * Maximum size of schema version cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_SIZE}
+ */
+ public static final ConfigEntry<Number> SCHEMA_VERSION_CACHE_SIZE =
+ ConfigEntry.optional("schema.registry.client.schema.version.cache.size",
+ Integer.class,
+ "Maximum size of schema version cache",
+ DEFAULT_SCHEMA_CACHE_SIZE,
+ ConfigEntry.PositiveNumberValidator.get());
+
+ /**
+ * Expiry interval(in seconds) of an entry in schema version cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS}
+ */
+ public static final ConfigEntry<Number> SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS =
+ ConfigEntry.optional("schema.registry.client.schema.version.cache.expiry.interval.secs",
+ Integer.class,
+ "Expiry interval(in seconds) of an entry in schema version cache",
+ DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS,
+ ConfigEntry.PositiveNumberValidator.get());
+
+ /**
+ * Maximum size of schema metadata cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_SIZE}
+ */
+ public static final ConfigEntry<Number> SCHEMA_METADATA_CACHE_SIZE =
+ ConfigEntry.optional("schema.registry.client.schema.metadata.cache.size",
+ Integer.class,
+ "Maximum size of schema metadata cache",
+ DEFAULT_SCHEMA_CACHE_SIZE,
+ ConfigEntry.PositiveNumberValidator.get());
+
+ /**
+ * Expiry interval(in seconds) of an entry in schema metadata cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS}
+ */
+ public static final ConfigEntry<Number> SCHEMA_METADATA_CACHE_EXPIRY_INTERVAL_SECS =
+ ConfigEntry.optional("schema.registry.client.schema.metadata.cache.expiry.interval.secs",
+ Integer.class,
+ "Expiry interval(in seconds) of an entry in schema metadata cache",
+ DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS,
+ ConfigEntry.PositiveNumberValidator.get());
+
+ /**
+ * Maximum size of schema text cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_SIZE}.
+ * This cache has ability to store/get entries with same schema name and schema text.
+ */
+ public static final ConfigEntry<Number> SCHEMA_TEXT_CACHE_SIZE =
+ ConfigEntry.optional("schema.registry.client.schema.text.cache.size",
+ Integer.class,
+ "Maximum size of schema text cache",
+ DEFAULT_SCHEMA_CACHE_SIZE,
+ ConfigEntry.PositiveNumberValidator.get());
+
+ /**
+ * Expiry interval(in seconds) of an entry in schema text cache. Default value is {@link #DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS}
+ */
+ public static final ConfigEntry<Number> SCHEMA_TEXT_CACHE_EXPIRY_INTERVAL_SECS =
+ ConfigEntry.optional("schema.registry.client.schema.text.cache.expiry.interval.secs",
+ Integer.class,
+ "Expiry interval(in seconds) of an entry in schema text cache.",
+ DEFAULT_SCHEMA_CACHE_EXPIRY_INTERVAL_SECS,
+ ConfigEntry.PositiveNumberValidator.get());
+
+ /**
+ *
+ */
+ public static final ConfigEntry<String> URL_SELECTOR_CLASS =
+ ConfigEntry.optional("schema.registry.client.url.selector",
+ String.class,
+ "Schema Registry URL selector class.",
+ FailoverUrlSelector.class.getName(),
+ ConfigEntry.NonEmptyStringValidator.get());
+
+ /**
+ *
+ */
+ public static final ConfigEntry<String> SASL_JAAS_CONFIG =
+ ConfigEntry.optional( "sasl.jaas.config",
+ String.class,
+ "Schema Registry Dynamic JAAS config for SASL connection.",
+ null,
+ ConfigEntry.NonEmptyStringValidator.get());
+
+ // connection properties
+ /**
+ * Default connection timeout on connections created while connecting to schema registry.
+ */
+ public static final int DEFAULT_CONNECTION_TIMEOUT = 30 * 1000;
+
+ /**
+ * Default read timeout on connections created while connecting to schema registry.
+ */
+ public static final int DEFAULT_READ_TIMEOUT = 30 * 1000;
+
+ private final Map<String, ?> config;
+ private final Map<String, ConfigEntry<?>> options;
+
+ public Configuration(Map<String, ?> config) {
+ Field[] fields = this.getClass().getDeclaredFields();
+ this.options = Collections.unmodifiableMap(buildOptions(fields));
+ this.config = buildConfig(config);
+ }
+
+ private Map<String, ?> buildConfig(Map<String, ?> config) {
+ Map<String, Object> result = new HashMap<>();
+ for (Map.Entry<String, ?> entry : config.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+
+ ConfigEntry configEntry = options.get(key);
+ if (configEntry != null) {
+ if (value != null) {
+ configEntry.validator().validate((value));
+ } else {
+ value = configEntry.defaultValue();
+ }
+ }
+ result.put(key, value);
+ }
+
+ return result;
+ }
+
+ private Map<String, ConfigEntry<?>> buildOptions(Field[] fields) {
+ Map<String, ConfigEntry<?>> options = new HashMap<>();
+ for (Field field : fields) {
+ Class<?> type = field.getType();
+
+ if (type.isAssignableFrom(ConfigEntry.class)) {
+ field.setAccessible(true);
+ try {
+ ConfigEntry configEntry = (ConfigEntry) field.get(this);
+ options.put(configEntry.name(), configEntry);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ return options;
+ }
+
+ public <T> T getValue(String propertyKey) {
+ return (T) (config.containsKey(propertyKey) ? config.get(propertyKey)
+ : options.get(propertyKey).defaultValue());
+ }
+
+ public Map<String, Object> getConfig() {
+ return Collections.unmodifiableMap(config);
+ }
+
+ public Collection<ConfigEntry<?>> getAvailableConfigEntries() {
+ return options.values();
+ }
+
+ }
+
+ private static class SchemaDigestEntry {
+ private final String name;
+ private final byte[] schemaDigest;
+
+ SchemaDigestEntry(String name, byte[] schemaDigest) {
+ Preconditions.checkNotNull(name, "name can not be null");
+ Preconditions.checkNotNull(schemaDigest, "schema digest can not be null");
+
+ this.name = name;
+ this.schemaDigest = schemaDigest;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ SchemaDigestEntry that = (SchemaDigestEntry) o;
+
+ if (name != null ? !name.equals(that.name) : that.name != null) return false;
+ return Arrays.equals(schemaDigest, that.schemaDigest);
+
+ }
+
+ @Override
+ public int hashCode() {
+ int result = name != null ? name.hashCode() : 0;
+ result = 31 * result + Arrays.hashCode(schemaDigest);
+ return result;
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
index a18a5bd..0e42afc 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/HortonworksSchemaRegistry.java
@@ -31,6 +31,8 @@ import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
@@ -46,6 +48,7 @@ import org.apache.nifi.util.Tuple;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
@@ -117,10 +120,66 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
.required(false)
.build();
+ static final PropertyDescriptor KERBEROS_PRINCIPAL = new PropertyDescriptor.Builder()
+ .name("kerberos-principal")
+ .displayName("Kerberos Principal")
+ .description("The kerberos principal to authenticate with when not using the kerberos credentials service")
+ .defaultValue(null)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
+ static final PropertyDescriptor KERBEROS_PASSWORD = new PropertyDescriptor.Builder()
+ .name("kerberos-password")
+ .displayName("Kerberos Password")
+ .description("The password for the kerberos principal when not using the kerberos credentials service")
+ .defaultValue(null)
+ .required(false)
+ .sensitive(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private volatile boolean usingKerberosWithPassword = false;
private volatile SchemaRegistryClient schemaRegistryClient;
private volatile boolean initialized;
private volatile Map<String, Object> schemaRegistryConfig;
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ final String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+ final String kerberosPassword = validationContext.getProperty(KERBEROS_PASSWORD).getValue();
+
+ final KerberosCredentialsService kerberosCredentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE)
+ .asControllerService(KerberosCredentialsService.class);
+
+ if (kerberosCredentialsService != null && !StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
+ results.add(new ValidationResult.Builder()
+ .subject(KERBEROS_CREDENTIALS_SERVICE.getDisplayName())
+ .valid(false)
+ .explanation("kerberos principal/password and kerberos credential service cannot be configured at the same time")
+ .build());
+ }
+
+ if (!StringUtils.isBlank(kerberosPrincipal) && StringUtils.isBlank(kerberosPassword)) {
+ results.add(new ValidationResult.Builder()
+ .subject(KERBEROS_PASSWORD.getDisplayName())
+ .valid(false)
+ .explanation("kerberos password is required when specifying a kerberos principal")
+ .build());
+ }
+
+ if (StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
+ results.add(new ValidationResult.Builder()
+ .subject(KERBEROS_PRINCIPAL.getDisplayName())
+ .valid(false)
+ .explanation("kerberos principal is required when specifying a kerberos password")
+ .build());
+ }
+
+ return results;
+ }
@OnEnabled
public void enable(final ConfigurationContext context) throws InitializationException {
@@ -146,17 +205,26 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
schemaRegistryConfig.put(CLIENT_SSL_PROPERTY_PREFIX, sslProperties);
}
+ final String kerberosPrincipal = context.getProperty(KERBEROS_PRINCIPAL).evaluateAttributeExpressions().getValue();
+ final String kerberosPassword = context.getProperty(KERBEROS_PASSWORD).getValue();
+
final KerberosCredentialsService kerberosCredentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE)
.asControllerService(KerberosCredentialsService.class);
+
if (kerberosCredentialsService != null) {
final String principal = kerberosCredentialsService.getPrincipal();
final String keytab = kerberosCredentialsService.getKeytab();
- final String jaasConfigString = getJaasConfig(principal, keytab);
+ final String jaasConfigString = getKeytabJaasConfig(principal, keytab);
schemaRegistryConfig.put(SchemaRegistryClient.Configuration.SASL_JAAS_CONFIG.name(), jaasConfigString);
+ } else if (!StringUtils.isBlank(kerberosPrincipal) && !StringUtils.isBlank(kerberosPassword)) {
+ schemaRegistryConfig.put(SchemaRegistryClientWithKerberosPassword.SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL, kerberosPrincipal);
+ schemaRegistryConfig.put(SchemaRegistryClientWithKerberosPassword.SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD, kerberosPassword);
+ schemaRegistryConfig.put(SchemaRegistryClientWithKerberosPassword.SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER, getLogger());
+ usingKerberosWithPassword = true;
}
}
- private String getJaasConfig(final String principal, final String keytab) {
+ private String getKeytabJaasConfig(final String principal, final String keytab) {
return "com.sun.security.auth.module.Krb5LoginModule required "
+ "useTicketCache=false "
+ "renewTicket=true "
@@ -205,20 +273,25 @@ public class HortonworksSchemaRegistry extends AbstractControllerService impleme
properties.add(CACHE_EXPIRATION);
properties.add(SSL_CONTEXT_SERVICE);
properties.add(KERBEROS_CREDENTIALS_SERVICE);
+ properties.add(KERBEROS_PRINCIPAL);
+ properties.add(KERBEROS_PASSWORD);
return properties;
}
protected synchronized SchemaRegistryClient getClient() {
if (!initialized) {
- schemaRegistryClient = new SchemaRegistryClient(schemaRegistryConfig);
+ if (usingKerberosWithPassword) {
+ schemaRegistryClient = new SchemaRegistryClientWithKerberosPassword(schemaRegistryConfig);
+ } else {
+ schemaRegistryClient = new SchemaRegistryClient(schemaRegistryConfig);
+ }
initialized = true;
}
return schemaRegistryClient;
}
-
private SchemaVersionInfo getLatestSchemaVersionInfo(final SchemaRegistryClient client, final String schemaName, final String branchName)
throws org.apache.nifi.schema.access.SchemaNotFoundException {
try {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/KerberosUserLogin.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/KerberosUserLogin.java
new file mode 100644
index 0000000..d73f348
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/KerberosUserLogin.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.hortonworks;
+
+import com.hortonworks.registries.auth.Login;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.security.krb.KerberosAction;
+import org.apache.nifi.security.krb.KerberosUser;
+
+import javax.security.auth.login.LoginContext;
+import javax.security.auth.login.LoginException;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+
+/**
+ * Implementation of Schema Registry's Login interface that wraps NiFi's KerberosUser API.
+ */
+public class KerberosUserLogin implements Login {
+
+ private final KerberosUser kerberosUser;
+ private final ComponentLog logger;
+
+ public KerberosUserLogin(final KerberosUser kerberosUser, final ComponentLog logger) {
+ this.kerberosUser = kerberosUser;
+ this.logger = logger;
+ }
+
+ @Override
+ public void configure(Map<String, ?> configs, String loginContextName) {
+
+ }
+
+ @Override
+ public LoginContext login() throws LoginException {
+ kerberosUser.login();
+
+ // the KerberosUser doesn't expose the LoginContext, but SchemaRegistryClient doesn't use
+ // the returned context at all, so we just return null here
+ return null;
+ }
+
+ @Override
+ public <T> T doAction(PrivilegedAction<T> action) throws LoginException {
+ final PrivilegedExceptionAction<T> wrappedAction = () -> action.run();
+ final KerberosAction<T> kerberosAction = new KerberosAction<T>(kerberosUser, wrappedAction, logger);
+ return kerberosAction.execute();
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/SchemaRegistryClientWithKerberosPassword.java b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/SchemaRegistryClientWithKerberosPassword.java
new file mode 100644
index 0000000..ca84f8f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-hwx-schema-registry-bundle/nifi-hwx-schema-registry-service/src/main/java/org/apache/nifi/schemaregistry/hortonworks/SchemaRegistryClientWithKerberosPassword.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.schemaregistry.hortonworks;
+
+import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.security.krb.KerberosPasswordUser;
+import org.apache.nifi.security.krb.KerberosUser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.login.LoginException;
+import java.util.Map;
+
+/**
+ * Extend the SchemaRegistryClient so we can override the initialization of the security context and use
+ * the KerberosUserLogin implementation that lets us login with a principal/password.
+ */
+public class SchemaRegistryClientWithKerberosPassword extends SchemaRegistryClient {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SchemaRegistryClientWithKerberosPassword.class);
+
+ public static final String SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL = "schema.registry.client.kerberos.principal";
+ public static final String SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD = "schema.registry.client.kerberos.password";
+ public static final String SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER = "schema.registry.client.nifi.component.logger";
+
+ private KerberosUser kerberosUser;
+
+ public SchemaRegistryClientWithKerberosPassword(final Map<String, ?> conf) {
+ super(conf);
+ }
+
+ @Override
+ protected void initializeSecurityContext() {
+ final String principal = configuration.getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PRINCIPAL);
+ if (principal == null) {
+ throw new IllegalArgumentException("Failed to login because principal is null");
+ }
+
+ final String password = configuration.getValue(SCHEMA_REGISTRY_CLIENT_KERBEROS_PASSWORD);
+ if (password == null) {
+ throw new IllegalArgumentException("Failed to login because password is null");
+ }
+
+ final Object loggerObject = configuration.getValue(SCHEMA_REGISTRY_CLIENT_NIFI_COMP_LOGGER);
+ if (loggerObject == null) {
+ throw new IllegalArgumentException("Failed to login because component logger is required");
+ }
+
+ if (!(loggerObject instanceof ComponentLog)) {
+ throw new IllegalArgumentException("Failed to login because logger object is not a ComponentLog");
+ }
+
+ kerberosUser = new KerberosPasswordUser(principal, password);
+ login = new KerberosUserLogin(kerberosUser, (ComponentLog) loggerObject);
+
+ try {
+ login.login();
+ } catch (LoginException e) {
+ LOGGER.error("Failed to login as principal `{}`", new Object[]{principal}, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ try {
+ kerberosUser.logout();
+ } catch (Throwable t) {
+ LOGGER.error("Error performing logout of principal during close(): " + t.getMessage(), t);
+ } finally {
+ kerberosUser = null;
+ }
+
+ super.close();
+ }
+}