You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/07/22 18:09:19 UTC
[nifi] branch main updated: NIFI-8936 Added dynamic http header
support to Confluent Schema Registry controller.
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f78a983 NIFI-8936 Added dynamic http header support to Confluent Schema Registry controller.
f78a983 is described below
commit f78a983bead42249294fc91c2e704578b87a46b2
Author: Knowles Atchison Jr <ka...@gmail.com>
AuthorDate: Wed Jul 21 14:12:42 2021 -0400
NIFI-8936 Added dynamic http header support to Confluent Schema Registry controller.
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #5237.
---
.../schemaregistry/ConfluentSchemaRegistry.java | 44 +++++++++++++++++++++-
.../client/RestSchemaRegistryClient.java | 21 +++++++++--
.../ConfluentSchemaRegistryTest.java | 20 ++++++++++
3 files changed, 80 insertions(+), 5 deletions(-)
diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
index ed1b196..65d5b9d 100644
--- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
@@ -32,6 +33,7 @@ import java.util.stream.Stream;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -39,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
import org.apache.nifi.confluent.schemaregistry.client.AuthenticationType;
import org.apache.nifi.confluent.schemaregistry.client.CachingSchemaRegistryClient;
import org.apache.nifi.confluent.schemaregistry.client.RestSchemaRegistryClient;
@@ -59,11 +62,15 @@ import org.apache.nifi.ssl.SSLContextService;
@CapabilityDescription("Provides a Schema Registry that interacts with the Confluent Schema Registry so that those Schemas that are stored in the Confluent Schema "
+ "Registry can be used in NiFi. The Confluent Schema Registry has a notion of a \"subject\" for schemas, which is their terminology for a schema name. When a Schema "
+ "is looked up by name by this registry, it will find a Schema in the Confluent Schema Registry with that subject.")
+@DynamicProperty(name = "request.header.*", value = "String literal, may not be empty", description = "Properties that begin with 'request.header.' " +
+ "are populated into a map and passed as http headers in REST requests to the Confluent Schema Registry")
public class ConfluentSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
+ private static final String REQUEST_HEADER_PREFIX = "request.header.";
+
static final PropertyDescriptor SCHEMA_REGISTRY_URLS = new PropertyDescriptor.Builder()
.name("url")
@@ -158,6 +165,28 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
return properties;
}
+ private static final Validator REQUEST_HEADER_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(value)
+ .valid(subject.startsWith(REQUEST_HEADER_PREFIX)
+ && subject.length() > REQUEST_HEADER_PREFIX.length())
+ .explanation("Dynamic property names must be of format 'request.header.*'")
+ .build();
+ }
+ };
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptionName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptionName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .addValidator(REQUEST_HEADER_VALIDATOR)
+ .build();
+ }
+
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
final List<String> baseUrls = getBaseURLs(context);
@@ -173,7 +202,20 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
final String username = context.getProperty(USERNAME).getValue();
final String password = context.getProperty(PASSWORD).getValue();
- final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext, username, password, getLogger());
+
+ // generate a map of http headers where the key is the remainder of the property name after
+ // the request header prefix
+ final Map<String, String> httpHeaders =
+ context.getProperties().entrySet()
+ .stream()
+ .filter(e -> e.getKey().getName().startsWith(REQUEST_HEADER_PREFIX))
+ .collect(Collectors.toMap(
+ map -> map.getKey().getName().substring(REQUEST_HEADER_PREFIX.length()),
+ Map.Entry::getValue)
+ );
+
+ final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis,
+ sslContext, username, password, getLogger(), httpHeaders);
final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS).longValue();
diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
index 6bf2c20..318d2d7 100644
--- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
@@ -36,6 +36,7 @@ import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
import javax.net.ssl.SSLContext;
import javax.ws.rs.client.Client;
import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.Invocation;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
@@ -44,6 +45,7 @@ import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
@@ -61,6 +63,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
private final List<String> baseUrls;
private final Client client;
private final ComponentLog logger;
+ private final Map<String, String> httpHeaders;
private static final String SUBJECT_FIELD_NAME = "subject";
private static final String VERSION_FIELD_NAME = "version";
@@ -75,8 +78,10 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final SSLContext sslContext,
final String username,
final String password,
- final ComponentLog logger) {
+ final ComponentLog logger,
+ final Map<String, String> httpHeaders) {
this.baseUrls = new ArrayList<>(baseUrls);
+ this.httpHeaders = httpHeaders;
final ClientConfig clientConfig = new ClientConfig();
clientConfig.property(ClientProperties.CONNECT_TIMEOUT, timeoutMillis);
@@ -191,8 +196,12 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final String path = getPath(pathSuffix);
final String trimmedBase = getTrimmedBase(baseUrl);
final String url = trimmedBase + path;
- final WebTarget builder = client.target(url);
- final Response response = builder.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE).post(Entity.json(schema.toString()));
+ final WebTarget webTarget = client.target(url);
+ Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE);
+ for (Map.Entry<String, String> header : httpHeaders.entrySet()) {
+ builder = builder.header(header.getKey(), header.getValue());
+ }
+ final Response response = builder.post(Entity.json(schema.toString()));
final int responseCode = response.getStatus();
if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
@@ -216,7 +225,11 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
final String url = trimmedBase + path;
final WebTarget webTarget = client.target(url);
- final Response response = webTarget.request().accept(MediaType.APPLICATION_JSON).get();
+ Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON);
+ for (Map.Entry<String, String> header : httpHeaders.entrySet()) {
+ builder = builder.header(header.getKey(), header.getValue());
+ }
+ final Response response = builder.get();
final int responseCode = response.getStatus();
if (responseCode == Response.Status.OK.getStatusCode()) {
diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
index 58789a5..cb7b8ba 100644
--- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
@@ -81,4 +81,24 @@ public class ConfluentSchemaRegistryTest {
runner.assertValid(registry);
runner.enableControllerService(registry);
}
+
+ @Test
+ public void testValidateAndEnableDynamicHttpHeaderProperties() {
+ runner.setProperty(registry, "request.header.User", "kafkaUser");
+ runner.setProperty(registry, "request.header.Test", "testValue");
+ runner.assertValid(registry);
+ runner.enableControllerService(registry);
+ }
+
+ @Test
+ public void testValidateDynamicHttpHeaderPropertiesMissingTrailingValue() {
+ runner.setProperty(registry, "request.header.", "NotValid");
+ runner.assertNotValid(registry);
+ }
+
+ @Test
+ public void testValidateDynamicHttpHeaderPropertiesInvalidSubject() {
+ runner.setProperty(registry, "not.valid.subject", "NotValid");
+ runner.assertNotValid(registry);
+ }
}