You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/07/22 18:09:19 UTC

[nifi] branch main updated: NIFI-8936 Added dynamic http header support to Confluent Schema Registry controller.

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f78a983  NIFI-8936 Added dynamic http header support to Confluent Schema Registry controller.
f78a983 is described below

commit f78a983bead42249294fc91c2e704578b87a46b2
Author: Knowles Atchison Jr <ka...@gmail.com>
AuthorDate: Wed Jul 21 14:12:42 2021 -0400

    NIFI-8936 Added dynamic http header support to Confluent Schema Registry controller.
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #5237.
---
 .../schemaregistry/ConfluentSchemaRegistry.java    | 44 +++++++++++++++++++++-
 .../client/RestSchemaRegistryClient.java           | 21 +++++++++--
 .../ConfluentSchemaRegistryTest.java               | 20 ++++++++++
 3 files changed, 80 insertions(+), 5 deletions(-)

diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
index ed1b196..65d5b9d 100644
--- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistry.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Set;
@@ -32,6 +33,7 @@ import java.util.stream.Stream;
 import javax.net.ssl.SSLContext;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -39,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.confluent.schemaregistry.client.AuthenticationType;
 import org.apache.nifi.confluent.schemaregistry.client.CachingSchemaRegistryClient;
 import org.apache.nifi.confluent.schemaregistry.client.RestSchemaRegistryClient;
@@ -59,11 +62,15 @@ import org.apache.nifi.ssl.SSLContextService;
 @CapabilityDescription("Provides a Schema Registry that interacts with the Confluent Schema Registry so that those Schemas that are stored in the Confluent Schema "
     + "Registry can be used in NiFi. The Confluent Schema Registry has a notion of a \"subject\" for schemas, which is their terminology for a schema name. When a Schema "
     + "is looked up by name by this registry, it will find a Schema in the Confluent Schema Registry with that subject.")
+@DynamicProperty(name = "request.header.*", value = "String literal, may not be empty", description = "Properties that begin with 'request.header.' " +
+        "are populated into a map and passed as http headers in REST requests to the Confluent Schema Registry")
 public class ConfluentSchemaRegistry extends AbstractControllerService implements SchemaRegistry {
 
     private static final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_NAME, SchemaField.SCHEMA_TEXT,
         SchemaField.SCHEMA_TEXT_FORMAT, SchemaField.SCHEMA_IDENTIFIER, SchemaField.SCHEMA_VERSION);
 
+    private static final String REQUEST_HEADER_PREFIX = "request.header.";
+
 
     static final PropertyDescriptor SCHEMA_REGISTRY_URLS = new PropertyDescriptor.Builder()
         .name("url")
@@ -158,6 +165,28 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
         return properties;
     }
 
+    private static final Validator REQUEST_HEADER_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+            return new ValidationResult.Builder()
+                    .subject(subject)
+                    .input(value)
+                    .valid(subject.startsWith(REQUEST_HEADER_PREFIX)
+                            && subject.length() > REQUEST_HEADER_PREFIX.length())
+                    .explanation("Dynamic property names must be of format 'request.header.*'")
+                    .build();
+        }
+    };
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(String propertyDescriptionName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptionName)
+                .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+                .addValidator(REQUEST_HEADER_VALIDATOR)
+                .build();
+    }
+
     @OnEnabled
     public void onEnabled(final ConfigurationContext context) {
         final List<String> baseUrls = getBaseURLs(context);
@@ -173,7 +202,20 @@ public class ConfluentSchemaRegistry extends AbstractControllerService implement
 
         final String username = context.getProperty(USERNAME).getValue();
         final String password = context.getProperty(PASSWORD).getValue();
-        final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis, sslContext, username, password, getLogger());
+
+        // generate a map of http headers where the key is the remainder of the property name after
+        // the request header prefix
+        final Map<String, String> httpHeaders =
+                context.getProperties().entrySet()
+                        .stream()
+                        .filter(e -> e.getKey().getName().startsWith(REQUEST_HEADER_PREFIX))
+                        .collect(Collectors.toMap(
+                                map -> map.getKey().getName().substring(REQUEST_HEADER_PREFIX.length()),
+                                Map.Entry::getValue)
+                        );
+
+        final SchemaRegistryClient restClient = new RestSchemaRegistryClient(baseUrls, timeoutMillis,
+                sslContext, username, password, getLogger(), httpHeaders);
 
         final int cacheSize = context.getProperty(CACHE_SIZE).asInteger();
         final long cacheExpiration = context.getProperty(CACHE_EXPIRATION).asTimePeriod(TimeUnit.NANOSECONDS).longValue();
diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
index 6bf2c20..318d2d7 100644
--- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/main/java/org/apache/nifi/confluent/schemaregistry/client/RestSchemaRegistryClient.java
@@ -36,6 +36,7 @@ import org.glassfish.jersey.client.authentication.HttpAuthenticationFeature;
 import javax.net.ssl.SSLContext;
 import javax.ws.rs.client.Client;
 import javax.ws.rs.client.Entity;
+import javax.ws.rs.client.Invocation;
 import javax.ws.rs.client.WebTarget;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -44,6 +45,7 @@ import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 
 /**
@@ -61,6 +63,7 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
     private final List<String> baseUrls;
     private final Client client;
     private final ComponentLog logger;
+    private final Map<String, String> httpHeaders;
 
     private static final String SUBJECT_FIELD_NAME = "subject";
     private static final String VERSION_FIELD_NAME = "version";
@@ -75,8 +78,10 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
                                     final SSLContext sslContext,
                                     final String username,
                                     final String password,
-                                    final ComponentLog logger) {
+                                    final ComponentLog logger,
+                                    final Map<String, String> httpHeaders) {
         this.baseUrls = new ArrayList<>(baseUrls);
+        this.httpHeaders = httpHeaders;
 
         final ClientConfig clientConfig = new ClientConfig();
         clientConfig.property(ClientProperties.CONNECT_TIMEOUT, timeoutMillis);
@@ -191,8 +196,12 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
             final String path = getPath(pathSuffix);
             final String trimmedBase = getTrimmedBase(baseUrl);
             final String url = trimmedBase + path;
-            final WebTarget builder = client.target(url);
-            final Response response = builder.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE).post(Entity.json(schema.toString()));
+            final WebTarget webTarget = client.target(url);
+            Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON).header(CONTENT_TYPE_HEADER, SCHEMA_REGISTRY_CONTENT_TYPE);
+            for (Map.Entry<String, String> header : httpHeaders.entrySet()) {
+                builder = builder.header(header.getKey(), header.getValue());
+            }
+            final Response response = builder.post(Entity.json(schema.toString()));
             final int responseCode = response.getStatus();
 
             if (responseCode == Response.Status.NOT_FOUND.getStatusCode()) {
@@ -216,7 +225,11 @@ public class RestSchemaRegistryClient implements SchemaRegistryClient {
             final String url = trimmedBase + path;
 
             final WebTarget webTarget = client.target(url);
-            final Response response = webTarget.request().accept(MediaType.APPLICATION_JSON).get();
+            Invocation.Builder builder = webTarget.request().accept(MediaType.APPLICATION_JSON);
+            for (Map.Entry<String, String> header : httpHeaders.entrySet()) {
+                builder = builder.header(header.getKey(), header.getValue());
+            }
+            final Response response = builder.get();
             final int responseCode = response.getStatus();
 
             if (responseCode == Response.Status.OK.getStatusCode()) {
diff --git a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
index 58789a5..cb7b8ba 100644
--- a/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
+++ b/nifi-nar-bundles/nifi-confluent-platform-bundle/nifi-confluent-schema-registry-service/src/test/java/org/apache/nifi/confluent/schemaregistry/ConfluentSchemaRegistryTest.java
@@ -81,4 +81,24 @@ public class ConfluentSchemaRegistryTest {
         runner.assertValid(registry);
         runner.enableControllerService(registry);
     }
+
+    @Test
+    public void testValidateAndEnableDynamicHttpHeaderProperties() {
+        runner.setProperty(registry, "request.header.User", "kafkaUser");
+        runner.setProperty(registry, "request.header.Test", "testValue");
+        runner.assertValid(registry);
+        runner.enableControllerService(registry);
+    }
+
+    @Test
+    public void testValidateDynamicHttpHeaderPropertiesMissingTrailingValue() {
+        runner.setProperty(registry, "request.header.", "NotValid");
+        runner.assertNotValid(registry);
+    }
+
+    @Test
+    public void testValidateDynamicHttpHeaderPropertiesInvalidSubject() {
+        runner.setProperty(registry, "not.valid.subject", "NotValid");
+        runner.assertNotValid(registry);
+    }
 }