You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jg...@apache.org on 2022/11/23 10:04:45 UTC

[nifi] branch main updated: NIFI-10866 Refactored Kafka 1.0 and 2.0 using nifi-kafka-shared

This is an automated email from the ASF dual-hosted git repository.

jgresock pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9cc9692c98 NIFI-10866 Refactored Kafka 1.0 and 2.0 using nifi-kafka-shared
9cc9692c98 is described below

commit 9cc9692c9846b5b837f9905b7b3c8638460736f6
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Tue Nov 22 16:08:22 2022 -0600

    NIFI-10866 Refactored Kafka 1.0 and 2.0 using nifi-kafka-shared
    
    Signed-off-by: Joe Gresock <jg...@gmail.com>
    
    This closes #6710.
---
 .../nifi-kafka-1-0-processors/pom.xml              |  13 +
 .../kafka/pubsub/ConsumeKafkaRecord_1_0.java       |  50 +-
 .../processors/kafka/pubsub/ConsumeKafka_1_0.java  |  58 ++-
 .../processors/kafka/pubsub/ConsumerLease.java     |  58 +--
 .../kafka/pubsub/KafkaProcessorUtils.java          | 405 ---------------
 .../kafka/pubsub/PublishKafkaRecord_1_0.java       |  72 ++-
 .../processors/kafka/pubsub/PublishKafka_1_0.java  |  63 +--
 .../record/sink/kafka/KafkaRecordSink_1_0.java     | 131 +----
 .../processors/kafka/pubsub/ConsumeKafkaTest.java  |  43 +-
 .../processors/kafka/pubsub/ConsumerPoolTest.java  |  15 +-
 .../processors/kafka/pubsub/ITConsumeKafka.java    |  12 +-
 .../kafka/pubsub/TestConsumeKafkaRecord_1_0.java   |  33 +-
 .../record/sink/kafka/TestKafkaRecordSink_1_0.java |  12 +-
 .../src/test/resources/log4j.properties            |  21 -
 .../src/test/resources/zookeeper.properties        |  20 -
 .../nifi-kafka-2-0-processors/pom.xml              |  14 +-
 .../kafka/pubsub/ConsumeKafkaRecord_2_0.java       |  61 +--
 .../processors/kafka/pubsub/ConsumeKafka_2_0.java  |  65 ++-
 .../processors/kafka/pubsub/ConsumerLease.java     |  41 +-
 .../kafka/pubsub/KafkaProcessorUtils.java          | 557 ---------------------
 .../kafka/pubsub/PublishKafkaRecord_2_0.java       |  83 ++-
 .../processors/kafka/pubsub/PublishKafka_2_0.java  |  73 +--
 .../record/sink/kafka/KafkaRecordSink_2_0.java     | 128 +----
 .../kafka/pubsub/ITConsumeKafka_2_0.java           |  12 +-
 .../kafka/pubsub/TestConsumeKafkaRecord_2_0.java   |  64 ++-
 .../kafka/pubsub/TestConsumeKafka_2_0.java         |  43 +-
 .../kafka/pubsub/TestConsumerPartitionsUtil.java   |   2 +-
 .../kafka/pubsub/TestPublishKafkaRecord_2_0.java   |   9 +-
 .../kafka/pubsub/TestPublishKafka_2_0.java         |   7 +-
 .../record/sink/kafka/TestKafkaRecordSink_2_0.java |  14 +-
 .../src/test/resources/log4j.properties            |  21 -
 31 files changed, 515 insertions(+), 1685 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
index ae44837c1f..de3ff83cae 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/pom.xml
@@ -56,6 +56,19 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-kerberos-credentials-service-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-kerberos</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kafka-shared</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
index 28a582c08b..1dbe5ed3dd 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
@@ -20,7 +20,6 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +45,12 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
+import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -67,9 +72,9 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 @WritesAttributes({
     @WritesAttribute(attribute = "record.count", description = "The number of records received"),
     @WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the records are from"),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic records are from")
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_PARTITION, description = "The partition of the topic the records are from"),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC, description = "The topic records are from")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
@@ -78,7 +83,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
         expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
 @SeeAlso({ConsumeKafka_1_0.class, PublishKafka_1_0.class, PublishKafkaRecord_1_0.class})
-public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
+public class ConsumeKafkaRecord_1_0 extends AbstractProcessor implements KafkaClientComponent {
 
     static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
     static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
@@ -216,18 +221,19 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
 
     static {
         List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
+        descriptors.add(BOOTSTRAP_SERVERS);
         descriptors.add(TOPICS);
         descriptors.add(TOPIC_TYPE);
         descriptors.add(RECORD_READER);
         descriptors.add(RECORD_WRITER);
         descriptors.add(HONOR_TRANSACTIONS);
-        descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
-        descriptors.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
-        descriptors.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
-        descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL);
-        descriptors.add(KafkaProcessorUtils.USER_KEYTAB);
-        descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
+        descriptors.add(SECURITY_PROTOCOL);
+        descriptors.add(SASL_MECHANISM);
+        descriptors.add(KERBEROS_CREDENTIALS_SERVICE);
+        descriptors.add(KERBEROS_SERVICE_NAME);
+        descriptors.add(KERBEROS_PRINCIPAL);
+        descriptors.add(KERBEROS_KEYTAB);
+        descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(GROUP_ID);
         descriptors.add(AUTO_OFFSET_RESET);
         descriptors.add(MESSAGE_HEADER_ENCODING);
@@ -267,7 +273,7 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
         return new PropertyDescriptor.Builder()
                 .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
                 .name(propertyDescriptorName)
-                .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
+                .addValidator(new DynamicPropertyValidator(ConsumerConfig.class))
                 .dynamic(true)
                 .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
                 .build();
@@ -275,7 +281,7 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+        return new KafkaClientCustomValidationFunction().apply(validationContext);
     }
 
     private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
@@ -291,16 +297,16 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
         final int maxLeases = context.getMaxConcurrentTasks();
         final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
 
-        final Map<String, Object> props = new HashMap<>();
-        KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
+        final KafkaPropertyProvider kafkaPropertyProvider = new StandardKafkaPropertyProvider(ConsumerConfig.class);
+        final Map<String, Object> props = kafkaPropertyProvider.getProperties(context);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         final String topicListing = context.getProperty(ConsumeKafkaRecord_1_0.TOPICS).evaluateAttributeExpressions().getValue();
         final String topicType = context.getProperty(ConsumeKafkaRecord_1_0.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
         final List<String> topics = new ArrayList<>();
-        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
 
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
@@ -327,7 +333,7 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
             return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topicPattern, maxUncommittedTime, securityProtocol,
                 bootstrapServers, log, honorTransactions, charset, headerNamePattern);
         } else {
-            getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
+            getLogger().error("Subscription type has an unknown value {}", topicType);
             return null;
         }
     }
@@ -352,12 +358,12 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
         if (!activeLeases.isEmpty()) {
             int count = 0;
             for (final ConsumerLease lease : activeLeases) {
-                getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease});
+                getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", lease);
                 lease.wakeup();
                 count++;
             }
 
-            getLogger().info("Woke up {} consumers", new Object[] {count});
+            getLogger().info("Woke up {} consumers", count);
         }
 
         activeLeases.clear();
@@ -387,7 +393,7 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
                 }
             } catch (final WakeupException we) {
                 getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
-                    + "Will roll back session and discard any partially received data.", new Object[] {lease});
+                    + "Will roll back session and discard any partially received data.", lease);
             } catch (final KafkaException kex) {
                 getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
                         new Object[]{lease, kex}, kex);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
index 511d85fe7f..772f6cfa5e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
@@ -21,7 +21,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -46,6 +45,13 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
+import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -53,20 +59,18 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
 @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 1.0 Consumer API. "
     + "The complementary NiFi processor for sending messages is PublishKafka_1_0.")
 @Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "1.0"})
 @WritesAttributes({
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_COUNT, description = "The number of messages written if more than one"),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_KEY, description = "The key of message if present and if single message. "
             + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
@@ -74,7 +78,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
         + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
         expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
-public class ConsumeKafka_1_0 extends AbstractProcessor {
+public class ConsumeKafka_1_0 extends AbstractProcessor implements KafkaClientComponent {
 
     static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
 
@@ -126,10 +130,10 @@ public class ConsumeKafka_1_0 extends AbstractProcessor {
     static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
             .name("key-attribute-encoding")
             .displayName("Key Attribute Encoding")
-            .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+            .description("FlowFiles that are emitted have an attribute named '" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
             .required(true)
-            .defaultValue(UTF8_ENCODING.getValue())
-            .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+            .defaultValue(KeyEncoding.UTF8.getValue())
+            .allowableValues(KeyEncoding.class)
             .build();
 
     static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
@@ -217,7 +221,13 @@ public class ConsumeKafka_1_0 extends AbstractProcessor {
 
     static {
         List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
+        descriptors.add(BOOTSTRAP_SERVERS);
+        descriptors.add(SECURITY_PROTOCOL);
+        descriptors.add(SASL_MECHANISM);
+        descriptors.add(KERBEROS_SERVICE_NAME);
+        descriptors.add(KERBEROS_PRINCIPAL);
+        descriptors.add(KERBEROS_KEYTAB);
+        descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(TOPICS);
         descriptors.add(TOPIC_TYPE);
         descriptors.add(HONOR_TRANSACTIONS);
@@ -257,7 +267,7 @@ public class ConsumeKafka_1_0 extends AbstractProcessor {
         return new PropertyDescriptor.Builder()
                 .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
                 .name(propertyDescriptorName)
-                .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
+                .addValidator(new DynamicPropertyValidator(ConsumerConfig.class))
                 .dynamic(true)
                 .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
                 .build();
@@ -265,7 +275,7 @@ public class ConsumeKafka_1_0 extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+        return new KafkaClientCustomValidationFunction().apply(validationContext);
     }
 
     private synchronized ConsumerPool getConsumerPool(final ProcessContext context) {
@@ -283,8 +293,8 @@ public class ConsumeKafka_1_0 extends AbstractProcessor {
         final byte[] demarcator = context.getProperty(ConsumeKafka_1_0.MESSAGE_DEMARCATOR).isSet()
                 ? context.getProperty(ConsumeKafka_1_0.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
                 : null;
-        final Map<String, Object> props = new HashMap<>();
-        KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
+        final KafkaPropertyProvider kafkaPropertyProvider = new StandardKafkaPropertyProvider(ConsumerConfig.class);
+        final Map<String, Object> props = kafkaPropertyProvider.getProperties(context);
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
@@ -293,8 +303,8 @@ public class ConsumeKafka_1_0 extends AbstractProcessor {
         final String topicType = context.getProperty(ConsumeKafka_1_0.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
         final List<String> topics = new ArrayList<>();
         final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
-        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         final boolean honorTransactions = context.getProperty(HONOR_TRANSACTIONS).asBoolean();
 
         final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
@@ -318,7 +328,7 @@ public class ConsumeKafka_1_0 extends AbstractProcessor {
             return new ConsumerPool(maxLeases, demarcator, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
                 bootstrapServers, log, honorTransactions, charset, headerNamePattern);
         } else {
-            getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
+            getLogger().error("Subscription type has an unknown value {}", topicType);
             return null;
         }
     }
@@ -343,12 +353,12 @@ public class ConsumeKafka_1_0 extends AbstractProcessor {
         if (!activeLeases.isEmpty()) {
             int count = 0;
             for (final ConsumerLease lease : activeLeases) {
-                getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease});
+                getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", lease);
                 lease.wakeup();
                 count++;
             }
 
-            getLogger().info("Woke up {} consumers", new Object[] {count});
+            getLogger().info("Woke up {} consumers", count);
         }
 
         activeLeases.clear();
@@ -378,7 +388,7 @@ public class ConsumeKafka_1_0 extends AbstractProcessor {
                 }
             } catch (final WakeupException we) {
                 getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
-                    + "Will roll back session and discard any partially received data.", new Object[] {lease});
+                    + "Will roll back session and discard any partially received data.", lease);
             } catch (final KafkaException kex) {
                 getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
                         new Object[]{lease, kex}, kex);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 12aca509f0..6279b6ea49 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -27,6 +27,9 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Header;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.attribute.StandardTransitUriProvider;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -60,8 +63,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_PARSE_FAILURE;
 import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_1_0.REL_SUCCESS;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
 /**
  * This class represents a lease to access a Kafka Consumer object. The lease is
@@ -142,7 +143,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
      */
     @Override
     public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
-        logger.debug("Rebalance Alert: Partitions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+        logger.debug("Rebalance Alert: Partitions '{}' revoked for lease '{}' with consumer '{}'", partitions, this, kafkaConsumer);
         //force a commit here.  Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition
         commit();
     }
@@ -156,7 +157,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
      */
     @Override
     public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
-        logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+        logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", partitions, this, kafkaConsumer);
     }
 
     /**
@@ -191,7 +192,6 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
      * higher performance than the other commitOffsets call as it allows the
      * kafka client to collect more data from Kafka before committing the
      * offsets.
-     *
      * if false then we didn't do anything and should probably yield if true
      * then we committed new data
      *
@@ -317,12 +317,12 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     public abstract void yield();
 
     private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
-        records.partitions().stream().forEach(partition -> {
+        records.partitions().forEach(partition -> {
             List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
             if (!messages.isEmpty()) {
                 //update maximum offset map for this topic partition
                 long maxOffset = messages.stream()
-                        .mapToLong(record -> record.offset())
+                        .mapToLong(ConsumerRecord::offset)
                         .max()
                         .getAsLong();
 
@@ -332,9 +332,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                 } else if (readerFactory != null && writerFactory != null) {
                     writeRecordData(getProcessSession(), messages, partition);
                 } else {
-                    messages.stream().forEach(message -> {
-                        writeData(getProcessSession(), message, partition);
-                    });
+                    messages.forEach(message -> writeData(getProcessSession(), message, partition));
                 }
 
                 totalMessages += messages.size();
@@ -348,9 +346,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             return null;
         }
 
-        if (HEX_ENCODING.getValue().equals(encoding)) {
+        if (KeyEncoding.HEX.getValue().equals(encoding)) {
             return DatatypeConverter.printHexBinary(key);
-        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+        } else if (KeyEncoding.UTF8.getValue().equals(encoding)) {
             return new String(key, StandardCharsets.UTF_8);
         } else {
             return null;  // won't happen because it is guaranteed by the Allowable Values
@@ -384,8 +382,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                 return false;
             }
 
-            final Map<String, String> attributes = new HashMap<>();
-            attributes.putAll(writeResult.getAttributes());
+            final Map<String, String> attributes = new HashMap<>(writeResult.getAttributes());
             attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
 
             bundle.flowFile = getProcessSession().putAllAttributes(bundle.flowFile, attributes);
@@ -401,9 +398,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         tracker.incrementRecordCount(1);
         final byte[] value = record.value();
         if (value != null) {
-            flowFile = session.write(flowFile, out -> {
-                out.write(value);
-            });
+            flowFile = session.write(flowFile, out -> out.write(value));
         }
         flowFile = session.putAllAttributes(flowFile, getAttributes(record));
         tracker.updateFlowFile(flowFile);
@@ -464,10 +459,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause, final String message) {
         // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
         final Map<String, String> attributes = getAttributes(consumerRecord);
-        attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
-        attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
-        attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
-        attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
+        attributes.put(KafkaFlowFileAttribute.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+        attributes.put(KafkaFlowFileAttribute.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
+        attributes.put(KafkaFlowFileAttribute.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
+        attributes.put(KafkaFlowFileAttribute.KAFKA_TOPIC, consumerRecord.topic());
 
         FlowFile failureFlowFile = session.create();
 
@@ -477,7 +472,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         }
         failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
 
-        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic());
+        final String transitUri = StandardTransitUriProvider.getTransitUri(securityProtocol, bootstrapServers, consumerRecord.topic());
         session.getProvenanceReporter().receive(failureFlowFile, transitUri);
 
         session.transfer(failureFlowFile, REL_PARSE_FAILURE);
@@ -525,7 +520,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                     try {
                         reader = readerFactory.createRecordReader(attributes, in, recordBytes.length, logger);
                     } catch (final IOException e) {
-                        yield();
+                        this.yield();
                         rollback(topicPartition);
                         handleParseFailure(consumerRecord, session, e, "Failed to parse message from Kafka due to comms failure. Will roll back session and try again momentarily.");
                         closeWriter(writer);
@@ -556,7 +551,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                                     logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
 
                                     rollback(topicPartition);
-                                    yield();
+                                    this.yield();
 
                                     throw new ProcessException(e);
                                 }
@@ -584,7 +579,6 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                         }
                     } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
                         handleParseFailure(consumerRecord, session, e);
-                        continue;
                     }
                 }
             }
@@ -626,25 +620,25 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
 
     private void populateAttributes(final BundleTracker tracker) {
         final Map<String, String> kafkaAttrs = new HashMap<>();
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
+        kafkaAttrs.put(KafkaFlowFileAttribute.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+        kafkaAttrs.put(KafkaFlowFileAttribute.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
         if (tracker.key != null && tracker.totalRecords == 1) {
-            kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+            kafkaAttrs.put(KafkaFlowFileAttribute.KAFKA_KEY, tracker.key);
         }
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
+        kafkaAttrs.put(KafkaFlowFileAttribute.KAFKA_PARTITION, String.valueOf(tracker.partition));
+        kafkaAttrs.put(KafkaFlowFileAttribute.KAFKA_TOPIC, tracker.topic);
         if (tracker.totalRecords > 1) {
             // Add a record.count attribute to remain consistent with other record-oriented processors. If not
             // reading/writing records, then use "kafka.count" attribute.
             if (tracker.recordWriter == null) {
-                kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+                kafkaAttrs.put(KafkaFlowFileAttribute.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
             } else {
                 kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords));
             }
         }
         final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
         final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
-        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
+        final String transitUri = StandardTransitUriProvider.getTransitUri(securityProtocol, bootstrapServers, tracker.topic);
         getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
         tracker.updateFlowFile(newFlowFile);
     }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
deleted file mode 100644
index 322d77efea..0000000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.components.resource.ResourceCardinality;
-import org.apache.nifi.components.resource.ResourceType;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.util.FormatUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import java.util.regex.Pattern;
-
-public final class KafkaProcessorUtils {
-    private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
-
-    final Logger logger = LoggerFactory.getLogger(this.getClass());
-
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
-            "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
-
-    static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
-
-    static final String KAFKA_KEY = "kafka.key";
-    static final String KAFKA_TOPIC = "kafka.topic";
-    static final String KAFKA_PARTITION = "kafka.partition";
-    static final String KAFKA_OFFSET = "kafka.offset";
-    static final String KAFKA_TIMESTAMP = "kafka.timestamp";
-    static final String KAFKA_COUNT = "kafka.count";
-    public static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
-    public static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
-    public static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
-    public static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL");
-
-    public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
-            .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
-            .displayName("Kafka Brokers")
-            .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
-            .required(true)
-            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .defaultValue("localhost:9092")
-            .build();
-    public static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
-            .name("security.protocol")
-            .displayName("Security Protocol")
-            .description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL)
-            .defaultValue(SEC_PLAINTEXT.getValue())
-            .build();
-    public static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder()
-            .name("sasl.kerberos.service.name")
-            .displayName("Kerberos Service Name")
-            .description("The service name that matches the primary name of the Kafka server configured in the broker JAAS file."
-                    + "This can be defined either in Kafka's JAAS config or in Kafka's config. "
-                    + "Corresponds to Kafka's 'security.protocol' property."
-                    + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.")
-            .required(false)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-    static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
-            .name("sasl.kerberos.principal")
-            .displayName("Kerberos Principal")
-            .description("The Kerberos principal that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
-                    + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
-            .required(false)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-    static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
-            .name("sasl.kerberos.keytab")
-            .displayName("Kerberos Keytab")
-            .description("The Kerberos keytab that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
-                    + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
-            .required(false)
-            .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
-            .name("ssl.context.service")
-            .displayName("SSL Context Service")
-            .description("Specifies the SSL Context Service to use for communicating with Kafka.")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
-    public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
-        .name("kerberos-credentials-service")
-        .displayName("Kerberos Credentials Service")
-        .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
-        .identifiesControllerService(KerberosCredentialsService.class)
-        .required(false)
-        .build();
-
-    static List<PropertyDescriptor> getCommonPropertyDescriptors() {
-        return Arrays.asList(
-                BOOTSTRAP_SERVERS,
-                SECURITY_PROTOCOL,
-                JAAS_SERVICE_NAME,
-                KERBEROS_CREDENTIALS_SERVICE,
-                USER_PRINCIPAL,
-                USER_KEYTAB,
-                SSL_CONTEXT_SERVICE
-        );
-    }
-
-    public static Collection<ValidationResult> validateCommonProperties(final ValidationContext validationContext) {
-        List<ValidationResult> results = new ArrayList<>();
-
-        String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
-
-        final String explicitPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
-        final String explicitKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
-        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
-
-        final String resolvedPrincipal;
-        final String resolvedKeytab;
-        if (credentialsService == null) {
-            resolvedPrincipal = explicitPrincipal;
-            resolvedKeytab = explicitKeytab;
-        } else {
-            resolvedPrincipal = credentialsService.getPrincipal();
-            resolvedKeytab = credentialsService.getKeytab();
-        }
-
-        if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
-            results.add(new ValidationResult.Builder()
-                .subject("Kerberos Credentials")
-                .valid(false)
-                .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
-                .build());
-        }
-
-        final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
-        if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
-            results.add(new ValidationResult.Builder()
-                .subject("Kerberos Credentials")
-                .valid(false)
-                .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
-                    + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
-                .build());
-        }
-
-        // validates that if one of SASL (Kerberos) option is selected for
-        // security protocol, then Kerberos principal is provided as well
-        if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
-            String jaasServiceName = validationContext.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
-            if (jaasServiceName == null || jaasServiceName.trim().length() == 0) {
-                results.add(new ValidationResult.Builder().subject(JAAS_SERVICE_NAME.getDisplayName()).valid(false)
-                    .explanation("The <" + JAAS_SERVICE_NAME.getDisplayName() + "> property must be set when <"
-                        + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '"
-                        + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
-                    .build());
-            }
-
-            if ((resolvedKeytab == null && resolvedPrincipal != null) || (resolvedKeytab != null && resolvedPrincipal == null)) {
-                results.add(new ValidationResult.Builder()
-                    .subject(JAAS_SERVICE_NAME.getDisplayName())
-                    .valid(false)
-                    .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
-                        + "must be set or neither must be set.")
-                    .build());
-            }
-        }
-
-        // If SSL or SASL_SSL then SSLContext Controller Service must be set.
-        final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol);
-        final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
-        if (csSet && !sslProtocol) {
-            results.add(new ValidationResult.Builder()
-                .subject(SECURITY_PROTOCOL.getDisplayName())
-                .valid(false)
-                .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.")
-                .build());
-        }
-
-        if (!csSet && sslProtocol) {
-            results.add(new ValidationResult.Builder()
-                .subject(SSL_CONTEXT_SERVICE.getDisplayName())
-                .valid(false)
-                .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service")
-                .build());
-        }
-
-        final String enableAutoCommit = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue();
-        if (enableAutoCommit != null && !enableAutoCommit.toLowerCase().equals("false")) {
-            results.add(new ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
-                .explanation("Enable auto commit must be false. It is managed by the processor.").build());
-        }
-
-        final String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue();
-        if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) {
-            results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
-        }
-
-        final String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue();
-        if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) {
-            results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
-                .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build());
-        }
-
-        final String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue();
-        if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) {
-            results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)
-                .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build());
-        }
-
-        final String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue();
-        if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) {
-            results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
-                .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build());
-        }
-
-        return results;
-    }
-
-    public static final class KafkaConfigValidator implements Validator {
-
-        final Class<?> classType;
-
-        public KafkaConfigValidator(final Class<?> classType) {
-            this.classType = classType;
-        }
-
-        @Override
-        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
-            final boolean knownValue = KafkaProcessorUtils.isStaticStringFieldNamePresent(subject, classType, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class);
-            return new ValidationResult.Builder().subject(subject).explanation("Must be a known configuration parameter for this kafka client").valid(knownValue).build();
-        }
-    }
-
-    /**
-     * Builds transit URI for provenance event. The transit URI will be in the
-     * form of &lt;security.protocol&gt;://&lt;bootstrap.servers&gt;/topic
-     */
-    static String buildTransitURI(String securityProtocol, String brokers, String topic) {
-        StringBuilder builder = new StringBuilder();
-        builder.append(securityProtocol);
-        builder.append("://");
-        builder.append(brokers);
-        builder.append("/");
-        builder.append(topic);
-        return builder.toString();
-    }
-
-
-    static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
-        for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
-            if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
-                // Translate SSLContext Service configuration into Kafka properties
-                final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-                if (sslContextService != null && sslContextService.isKeyStoreConfigured()) {
-                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile());
-                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword());
-                    final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
-                    mapToPopulate.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPass);
-                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, sslContextService.getKeyStoreType());
-                }
-
-                if (sslContextService != null && sslContextService.isTrustStoreConfigured()) {
-                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslContextService.getTrustStoreFile());
-                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslContextService.getTrustStorePassword());
-                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
-                }
-            }
-
-            String propertyName = propertyDescriptor.getName();
-            String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
-                    ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
-                    : context.getProperty(propertyDescriptor).getValue();
-
-            if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())) {
-                // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
-                // or the standard NiFi time period such as "5 secs"
-                if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
-                    propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
-                }
-
-                if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
-                    mapToPopulate.put(propertyName, propertyValue);
-                }
-            }
-        }
-
-        String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
-        if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
-            setJaasConfig(mapToPopulate, context);
-        }
-    }
-
-    /**
-     * Method used to create a transactional id Supplier for KafkaProducer
-     *
-     * @param prefix String transactional id prefix, can be null
-     * @return A Supplier that generates transactional id
-     */
-    public static Supplier<String> getTransactionalIdSupplier(String prefix) {
-        return () -> (prefix == null ? "" : prefix)  + UUID.randomUUID().toString();
-    }
-
-    /**
-     * Method used to configure the 'sasl.jaas.config' property based on KAFKA-4259<br />
-     * https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients<br />
-     * <br />
-     * It expects something with the following format: <br />
-     * <br />
-     * &lt;LoginModuleClass&gt; &lt;ControlFlag&gt; *(&lt;OptionName&gt;=&lt;OptionValue&gt;); <br />
-     * ControlFlag = required / requisite / sufficient / optional
-     *
-     * @param mapToPopulate Map of configuration properties
-     * @param context Context
-     */
-    private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
-        String keytab = context.getProperty(USER_KEYTAB) == null ? null : context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
-        String principal = context.getProperty(USER_PRINCIPAL) == null ? null : context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
-
-        // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
-        // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
-        final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
-        if (credentialsService != null) {
-            principal = credentialsService.getPrincipal();
-            keytab = credentialsService.getKeytab();
-        }
-
-
-        String serviceName = context.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
-        if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
-            mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
-                    + "useTicketCache=false "
-                    + "renewTicket=true "
-                    + "serviceName=\"" + serviceName + "\" "
-                    + "useKeyTab=true "
-                    + "keyTab=\"" + keytab + "\" "
-                    + "principal=\"" + principal + "\";");
-        }
-    }
-
-    public static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
-        return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
-    }
-
-    private static Set<String> getPublicStaticStringFieldValues(final Class<?>... classes) {
-        final Set<String> strings = new HashSet<>();
-        for (final Class<?> classType : classes) {
-            for (final Field field : classType.getDeclaredFields()) {
-                if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) {
-                    try {
-                        strings.add(String.valueOf(field.get(null)));
-                    } catch (IllegalArgumentException | IllegalAccessException ex) {
-                        //ignore
-                    }
-                }
-            }
-        }
-        return strings;
-    }
-
-}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
index 1edd5bac67..69805eb706 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
@@ -32,13 +32,19 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.shared.attribute.StandardTransitUriProvider;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
+import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.FlowFileFilters;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.record.path.RecordPath;
@@ -54,13 +60,10 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -88,7 +91,7 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIB
 @WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
     + "FlowFiles that are routed to success.")
 @SeeAlso({PublishKafka_1_0.class, ConsumeKafka_1_0.class, ConsumeKafkaRecord_1_0.class})
-public class PublishKafkaRecord_1_0 extends AbstractProcessor {
+public class PublishKafkaRecord_1_0 extends AbstractProcessor implements KafkaClientComponent {
     protected static final String MSG_COUNT = "msg.count";
 
     static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
@@ -115,10 +118,6 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
         "Interprets the <Partition> property as Expression Language that will be evaluated against each FlowFile. This Expression will be evaluated once against the FlowFile, " +
             "so all Records in a given FlowFile will go to the same partition.");
 
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
-        "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
-
     static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
         .name("topic")
         .displayName("Topic Name")
@@ -282,7 +281,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
 
     static {
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
+        properties.add(BOOTSTRAP_SERVERS);
         properties.add(TOPIC);
         properties.add(RECORD_READER);
         properties.add(RECORD_WRITER);
@@ -291,12 +290,12 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
         properties.add(DELIVERY_GUARANTEE);
         properties.add(ATTRIBUTE_NAME_REGEX);
         properties.add(MESSAGE_HEADER_ENCODING);
-        properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
-        properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
-        properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
-        properties.add(KafkaProcessorUtils.USER_PRINCIPAL);
-        properties.add(KafkaProcessorUtils.USER_KEYTAB);
-        properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
+        properties.add(SECURITY_PROTOCOL);
+        properties.add(KERBEROS_CREDENTIALS_SERVICE);
+        properties.add(KERBEROS_SERVICE_NAME);
+        properties.add(KERBEROS_PRINCIPAL);
+        properties.add(KERBEROS_KEYTAB);
+        properties.add(SSL_CONTEXT_SERVICE);
         properties.add(MESSAGE_KEY_FIELD);
         properties.add(MAX_REQUEST_SIZE);
         properties.add(ACK_WAIT_TIME);
@@ -328,7 +327,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
         return new PropertyDescriptor.Builder()
             .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
             .name(propertyDescriptorName)
-            .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+            .addValidator(new DynamicPropertyValidator(ProducerConfig.class))
             .dynamic(true)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
@@ -336,8 +335,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
-        results.addAll(KafkaProcessorUtils.validateCommonProperties(validationContext));
+        final List<ValidationResult> results = new ArrayList<>(new KafkaClientCustomValidationFunction().apply(validationContext));
 
         final boolean useTransactions = validationContext.getProperty(USE_TRANSACTIONS).asBoolean();
         if (useTransactions) {
@@ -392,19 +390,19 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
 
     protected PublisherPool createPublisherPool(final ProcessContext context) {
         final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
-        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
 
         final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
         final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
         final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
         final String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue();
-        Supplier<String> transactionalIdSupplier = KafkaProcessorUtils.getTransactionalIdSupplier(transactionalIdPrefix);
+        Supplier<String> transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix);
 
         final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
         final Charset charset = Charset.forName(charsetName);
 
-        final Map<String, Object> kafkaProperties = new HashMap<>();
-        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+        final KafkaPropertyProvider propertyProvider = new StandardKafkaPropertyProvider(ProducerConfig.class);
+        final Map<String, Object> kafkaProperties = propertyProvider.getProperties(context);
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
@@ -434,8 +432,8 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
             return;
         }
 
-        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
@@ -470,24 +468,20 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
                 final Function<Record, Integer> partitioner = getPartitioner(context, flowFile);
 
                 try {
-                    session.read(flowFile, new InputStreamCallback() {
-                        @Override
-                        public void process(final InputStream in) throws IOException {
-                            try {
-                                final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
-                                final RecordSet recordSet = reader.createRecordSet();
-
-                                final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
-                                lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
-                            } catch (final SchemaNotFoundException | MalformedRecordException e) {
-                                throw new ProcessException(e);
-                            }
+                    session.read(flowFile, in -> {
+                        try {
+                            final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+                            final RecordSet recordSet = reader.createRecordSet();
+
+                            final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
+                            lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
+                        } catch (final SchemaNotFoundException | MalformedRecordException e) {
+                            throw new ProcessException(e);
                         }
                     });
                 } catch (final Exception e) {
                     // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles()
                     lease.fail(flowFile, e);
-                    continue;
                 }
             }
 
@@ -509,7 +503,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
                 success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
                 session.adjustCounter("Messages Sent", msgCount, true);
 
-                final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+                final String transitUri = StandardTransitUriProvider.getTransitUri(securityProtocol, bootstrapServers, topic);
                 session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
                 session.transfer(success, REL_SUCCESS);
             }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
index a0e0ecb169..6de516be7d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
@@ -31,26 +31,32 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.attribute.StandardTransitUriProvider;
+import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
+import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.FlowFileFilters;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import javax.xml.bind.DatatypeConverter;
 import java.io.BufferedInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -76,7 +82,7 @@ import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIB
 @WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
     + "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
     + "be greater than 1.")
-public class PublishKafka_1_0 extends AbstractProcessor {
+public class PublishKafka_1_0 extends AbstractProcessor implements KafkaPublishComponent {
     protected static final String MSG_COUNT = "msg.count";
 
     static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
@@ -100,10 +106,6 @@ public class PublishKafka_1_0 extends AbstractProcessor {
         "Interprets the <Partition> property as Expression Language that will be evaluated against each FlowFile. This Expression will be evaluated once against the FlowFile, " +
             "so all Records in a given FlowFile will go to the same partition.");
 
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
-        "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
-
     static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
         .name("topic")
         .displayName("Topic Name")
@@ -170,10 +172,10 @@ public class PublishKafka_1_0 extends AbstractProcessor {
     static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
         .name("key-attribute-encoding")
         .displayName("Key Attribute Encoding")
-        .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+        .description("FlowFiles that are emitted have an attribute named '" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
         .required(true)
-        .defaultValue(UTF8_ENCODING.getValue())
-        .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+        .defaultValue(KeyEncoding.UTF8.getValue())
+        .allowableValues(KeyEncoding.class)
         .build();
 
     static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
@@ -273,7 +275,12 @@ public class PublishKafka_1_0 extends AbstractProcessor {
 
     static {
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
+        properties.add(BOOTSTRAP_SERVERS);
+        properties.add(SECURITY_PROTOCOL);
+        properties.add(KERBEROS_SERVICE_NAME);
+        properties.add(KERBEROS_PRINCIPAL);
+        properties.add(KERBEROS_KEYTAB);
+        properties.add(SSL_CONTEXT_SERVICE);
         properties.add(TOPIC);
         properties.add(DELIVERY_GUARANTEE);
         properties.add(USE_TRANSACTIONS);
@@ -313,7 +320,7 @@ public class PublishKafka_1_0 extends AbstractProcessor {
         return new PropertyDescriptor.Builder()
             .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
             .name(propertyDescriptorName)
-            .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+            .addValidator(new DynamicPropertyValidator(ProducerConfig.class))
             .dynamic(true)
             .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
@@ -321,8 +328,7 @@ public class PublishKafka_1_0 extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
-        results.addAll(KafkaProcessorUtils.validateCommonProperties(validationContext));
+        final List<ValidationResult> results = new ArrayList<>(new KafkaClientCustomValidationFunction().apply(validationContext));
 
         final boolean useTransactions = validationContext.getProperty(USE_TRANSACTIONS).asBoolean();
         if (useTransactions) {
@@ -363,19 +369,19 @@ public class PublishKafka_1_0 extends AbstractProcessor {
 
     protected PublisherPool createPublisherPool(final ProcessContext context) {
         final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
-        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
 
         final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
         final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
         final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
         final String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue();
-        Supplier<String> transactionalIdSupplier = KafkaProcessorUtils.getTransactionalIdSupplier(transactionalIdPrefix);
+        Supplier<String> transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix);
 
         final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
         final Charset charset = Charset.forName(charsetName);
 
-        final Map<String, Object> kafkaProperties = new HashMap<>();
-        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+        final KafkaPropertyProvider propertyProvider = new StandardKafkaPropertyProvider(ProducerConfig.class);
+        final Map<String, Object> kafkaProperties = propertyProvider.getProperties(context);
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
@@ -407,8 +413,8 @@ public class PublishKafka_1_0 extends AbstractProcessor {
             return;
         }
 
-        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
 
         final long startTime = System.nanoTime();
@@ -441,12 +447,9 @@ public class PublishKafka_1_0 extends AbstractProcessor {
                 }
 
                 final Integer partition = getPartition(context, flowFile);
-                session.read(flowFile, new InputStreamCallback() {
-                    @Override
-                    public void process(final InputStream rawIn) throws IOException {
-                        try (final InputStream in = new BufferedInputStream(rawIn)) {
-                            lease.publish(flowFile, in, messageKey, demarcatorBytes, topic, partition);
-                        }
+                session.read(flowFile, rawIn -> {
+                    try (final InputStream in = new BufferedInputStream(rawIn)) {
+                        lease.publish(flowFile, in, messageKey, demarcatorBytes, topic, partition);
                     }
                 });
             }
@@ -469,7 +472,7 @@ public class PublishKafka_1_0 extends AbstractProcessor {
                 success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
                 session.adjustCounter("Messages Sent", msgCount, true);
 
-                final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+                final String transitUri = StandardTransitUriProvider.getTransitUri(securityProtocol, bootstrapServers, topic);
                 session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
                 session.transfer(success, REL_SUCCESS);
             }
@@ -483,7 +486,7 @@ public class PublishKafka_1_0 extends AbstractProcessor {
         if (context.getProperty(KEY).isSet()) {
             uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
         } else {
-            uninterpretedKey = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY);
+            uninterpretedKey = flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_KEY);
         }
 
         if (uninterpretedKey == null) {
@@ -491,7 +494,7 @@ public class PublishKafka_1_0 extends AbstractProcessor {
         }
 
         final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
-        if (UTF8_ENCODING.getValue().equals(keyEncoding)) {
+        if (KeyEncoding.UTF8.getValue().equals(keyEncoding)) {
             return uninterpretedKey.getBytes(StandardCharsets.UTF_8);
         }
 
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_1_0.java
index 287b9aa449..a7d602baa3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_1_0.java
@@ -16,14 +16,11 @@
  */
 package org.apache.nifi.record.sink.kafka;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -39,10 +36,13 @@ import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
+import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
+import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils;
 import org.apache.nifi.record.sink.RecordSinkService;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.RecordSetWriter;
@@ -51,24 +51,19 @@ import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.exception.TokenTooLargeException;
-import org.apache.nifi.util.FormatUtils;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-
 @Tags({"kafka", "record", "sink"})
 @CapabilityDescription("Provides a service to write records to a Kafka 1.x topic.")
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
@@ -76,7 +71,7 @@ import java.util.concurrent.TimeoutException;
                 + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
                 + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
         expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
-public class KafkaRecordSink_1_0 extends AbstractControllerService implements RecordSinkService {
+public class KafkaRecordSink_1_0 extends AbstractControllerService implements RecordSinkService, KafkaPublishComponent {
 
     static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
             "Records are considered 'transmitted unsuccessfully' unless the message is replicated to the appropriate "
@@ -89,10 +84,6 @@ public class KafkaRecordSink_1_0 extends AbstractControllerService implements Re
             "Records are considered 'transmitted successfully' after successfully writing the content to a Kafka node, "
                     + "without waiting for a response. This provides the best performance but may result in data loss.");
 
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
-            "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
-
     static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
             .name("topic")
             .displayName("Topic Name")
@@ -171,17 +162,18 @@ public class KafkaRecordSink_1_0 extends AbstractControllerService implements Re
     private volatile Producer<byte[], byte[]> producer;
 
     @Override
-    protected void init(final ControllerServiceInitializationContext context) throws InitializationException {
+    protected void init(final ControllerServiceInitializationContext context) {
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
+        properties.add(BOOTSTRAP_SERVERS);
         properties.add(TOPIC);
         properties.add(RecordSinkService.RECORD_WRITER_FACTORY);
         properties.add(DELIVERY_GUARANTEE);
         properties.add(MESSAGE_HEADER_ENCODING);
-        properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
-        properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
-        properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
-        properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
+        properties.add(SECURITY_PROTOCOL);
+        properties.add(SASL_MECHANISM);
+        properties.add(KERBEROS_CREDENTIALS_SERVICE);
+        properties.add(KERBEROS_SERVICE_NAME);
+        properties.add(SSL_CONTEXT_SERVICE);
         properties.add(MAX_REQUEST_SIZE);
         properties.add(ACK_WAIT_TIME);
         properties.add(METADATA_WAIT_TIME);
@@ -199,7 +191,7 @@ public class KafkaRecordSink_1_0 extends AbstractControllerService implements Re
         return new PropertyDescriptor.Builder()
                 .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
                 .name(propertyDescriptorName)
-                .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+                .addValidator(new DynamicPropertyValidator(ProducerConfig.class))
                 .dynamic(true)
                 .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
                 .build();
@@ -207,7 +199,7 @@ public class KafkaRecordSink_1_0 extends AbstractControllerService implements Re
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+        return new KafkaClientCustomValidationFunction().apply(validationContext);
     }
 
     @OnEnabled
@@ -216,12 +208,10 @@ public class KafkaRecordSink_1_0 extends AbstractControllerService implements Re
         writerFactory = context.getProperty(RecordSinkService.RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
         maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
         maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
+        maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
 
-        final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
-        final Charset charset = Charset.forName(charsetName);
-
-        final Map<String, Object> kafkaProperties = new HashMap<>();
-        buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+        final KafkaPropertyProvider kafkaPropertyProvider = new StandardKafkaPropertyProvider(ConsumerConfig.class);
+        final Map<String, Object> kafkaProperties = kafkaPropertyProvider.getProperties(context);
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
@@ -299,93 +289,12 @@ public class KafkaRecordSink_1_0 extends AbstractControllerService implements Re
     }
 
     @OnDisabled
-    public void stop() throws IOException {
+    public void stop() {
         if (producer != null) {
             producer.close(maxAckWaitMillis, TimeUnit.MILLISECONDS);
         }
     }
 
-    static void buildCommonKafkaProperties(final ConfigurationContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
-        for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
-            if (propertyDescriptor.equals(KafkaProcessorUtils.SSL_CONTEXT_SERVICE)) {
-                // Translate SSLContext Service configuration into Kafka properties
-                final SSLContextService sslContextService = context.getProperty(KafkaProcessorUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-                if (sslContextService != null && sslContextService.isKeyStoreConfigured()) {
-                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile());
-                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword());
-                    final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
-                    mapToPopulate.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPass);
-                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, sslContextService.getKeyStoreType());
-                }
-
-                if (sslContextService != null && sslContextService.isTrustStoreConfigured()) {
-                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslContextService.getTrustStoreFile());
-                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslContextService.getTrustStorePassword());
-                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
-                }
-            }
-
-            String propertyName = propertyDescriptor.getName();
-            String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
-                    ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
-                    : context.getProperty(propertyDescriptor).getValue();
-
-            if (propertyValue != null) {
-                // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
-                // or the standard NiFi time period such as "5 secs"
-                if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
-                    propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
-                }
-
-                if (KafkaProcessorUtils.isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
-                    mapToPopulate.put(propertyName, propertyValue);
-                }
-            }
-        }
-
-        String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        if (KafkaProcessorUtils.SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || KafkaProcessorUtils.SEC_SASL_SSL.getValue().equals(securityProtocol)) {
-            setJaasConfig(mapToPopulate, context);
-        }
-    }
-
-    /**
-     * Method used to configure the 'sasl.jaas.config' property based on KAFKA-4259<br />
-     * https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients<br />
-     * <br />
-     * It expects something with the following format: <br />
-     * <br />
-     * &lt;LoginModuleClass&gt; &lt;ControlFlag&gt; *(&lt;OptionName&gt;=&lt;OptionValue&gt;); <br />
-     * ControlFlag = required / requisite / sufficient / optional
-     *
-     * @param mapToPopulate Map of configuration properties
-     * @param context       Context
-     */
-    private static void setJaasConfig(Map<String, Object> mapToPopulate, ConfigurationContext context) {
-        String keytab = null;
-        String principal = null;
-
-        // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
-        // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
-        final KerberosCredentialsService credentialsService = context.getProperty(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
-        if (credentialsService != null) {
-            principal = credentialsService.getPrincipal();
-            keytab = credentialsService.getKeytab();
-        }
-
-
-        String serviceName = context.getProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
-        if (StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
-            mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
-                    + "useTicketCache=false "
-                    + "renewTicket=true "
-                    + "serviceName=\"" + serviceName + "\" "
-                    + "useKeyTab=true "
-                    + "keyTab=\"" + keytab + "\" "
-                    + "principal=\"" + principal + "\";");
-        }
-    }
-
     // this getter is intended explicitly for testing purposes
     protected RecordSetWriterFactory getWriterFactory() {
         return this.writerFactory;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index ff276b665b..22a7b4ddb0 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.kafka.pubsub;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.kafka.shared.property.SecurityProtocol;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
@@ -39,80 +40,74 @@ public class ConsumeKafkaTest {
     }
 
     @Test
-    public void validateCustomValidatorSettings() throws Exception {
+    public void validateCustomValidatorSettings() {
         ConsumeKafka_1_0 consumeKafka = new ConsumeKafka_1_0();
         TestRunner runner = TestRunners.newTestRunner(consumeKafka);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_1_0.BOOTSTRAP_SERVERS, "okeydokey:1234");
         runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
         runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         runner.assertValid();
-        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
-        runner.assertNotValid();
-        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-        runner.assertValid();
         runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         runner.assertValid();
-        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        runner.assertNotValid();
     }
 
     @Test
-    public void validatePropertiesValidation() throws Exception {
+    public void validatePropertiesValidation() {
         ConsumeKafka_1_0 consumeKafka = new ConsumeKafka_1_0();
         TestRunner runner = TestRunners.newTestRunner(consumeKafka);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_1_0.BOOTSTRAP_SERVERS, "okeydokey:1234");
         runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
 
         runner.removeProperty(ConsumeKafka_1_0.GROUP_ID);
 
-        AssertionError e = assertThrows(AssertionError.class, () -> runner.assertValid());
+        AssertionError e = assertThrows(AssertionError.class, runner::assertValid);
         assertTrue(e.getMessage().contains("invalid because Group ID is required"));
 
         runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "");
 
-        e = assertThrows(AssertionError.class, () -> runner.assertValid());
+        e = assertThrows(AssertionError.class, runner::assertValid);
         assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
 
         runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "  ");
 
-        e = assertThrows(AssertionError.class, () -> runner.assertValid());
+        e = assertThrows(AssertionError.class, runner::assertValid);
         assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
     }
 
     @Test
-    public void testJaasConfiguration() throws Exception {
+    public void testJaasConfiguration() {
         ConsumeKafka_1_0 consumeKafka = new ConsumeKafka_1_0();
         TestRunner runner = TestRunners.newTestRunner(consumeKafka);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_1_0.BOOTSTRAP_SERVERS, "okeydokey:1234");
         runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafka_1_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
 
-        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.setProperty(ConsumeKafka_1_0.SECURITY_PROTOCOL, SecurityProtocol.SASL_PLAINTEXT.name());
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
-        runner.assertValid();
+        runner.setProperty(ConsumeKafka_1_0.KERBEROS_SERVICE_NAME, "kafka");
+        runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+        runner.setProperty(ConsumeKafka_1_0.KERBEROS_PRINCIPAL, "nifi@APACHE.COM");
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+        runner.setProperty(ConsumeKafka_1_0.KERBEROS_KEYTAB, "not.A.File");
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+        runner.setProperty(ConsumeKafka_1_0.KERBEROS_KEYTAB, "src/test/resources/server.properties");
         runner.assertValid();
 
         runner.setVariable("keytab", "src/test/resources/server.properties");
         runner.setVariable("principal", "nifi@APACHE.COM");
         runner.setVariable("service", "kafka");
-        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
-        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}");
-        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}");
+        runner.setProperty(ConsumeKafka_1_0.KERBEROS_PRINCIPAL, "${principal}");
+        runner.setProperty(ConsumeKafka_1_0.KERBEROS_KEYTAB, "${keytab}");
+        runner.setProperty(ConsumeKafka_1_0.KERBEROS_SERVICE_NAME, "${service}");
         runner.assertValid();
     }
 
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
index 13a0e78912..1a54d0d13d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPoolTest.java
@@ -51,8 +51,7 @@ public class ConsumerPoolTest {
 
     private Consumer<byte[], byte[]> consumer = null;
     private ProcessSession mockSession = null;
-    private ProcessContext mockContext = Mockito.mock(ProcessContext.class);
-    private ProvenanceReporter mockReporter = null;
+    private final ProcessContext mockContext = Mockito.mock(ProcessContext.class);
     private ConsumerPool testPool = null;
     private ConsumerPool testDemarcatedPool = null;
     private ComponentLog logger = null;
@@ -63,7 +62,7 @@ public class ConsumerPoolTest {
         consumer = mock(Consumer.class);
         logger = mock(ComponentLog.class);
         mockSession = mock(ProcessSession.class);
-        mockReporter = mock(ProvenanceReporter.class);
+        final ProvenanceReporter mockReporter = mock(ProvenanceReporter.class);
         when(mockSession.getProvenanceReporter()).thenReturn(mockReporter);
         testPool = new ConsumerPool(
                 1,
@@ -104,7 +103,7 @@ public class ConsumerPoolTest {
     }
 
     @Test
-    public void validatePoolSimpleCreateClose() throws Exception {
+    public void validatePoolSimpleCreateClose() {
 
         when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
@@ -130,7 +129,7 @@ public class ConsumerPoolTest {
 
     @Test
     @SuppressWarnings("unchecked")
-    public void validatePoolSimpleCreatePollClose() throws Exception {
+    public void validatePoolSimpleCreatePollClose() {
         final byte[][] firstPassValues = new byte[][]{
             "Hello-1".getBytes(StandardCharsets.UTF_8),
             "Hello-2".getBytes(StandardCharsets.UTF_8),
@@ -153,7 +152,7 @@ public class ConsumerPoolTest {
     }
 
     @Test
-    public void validatePoolSimpleBatchCreateClose() throws Exception {
+    public void validatePoolSimpleBatchCreateClose() {
         when(consumer.poll(anyLong())).thenReturn(createConsumerRecords("nifi", 0, 0L, new byte[][]{}));
         for (int i = 0; i < 100; i++) {
             try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
@@ -173,7 +172,7 @@ public class ConsumerPoolTest {
 
     @Test
     @SuppressWarnings("unchecked")
-    public void validatePoolBatchCreatePollClose() throws Exception {
+    public void validatePoolBatchCreatePollClose() {
         final byte[][] firstPassValues = new byte[][]{
             "Hello-1".getBytes(StandardCharsets.UTF_8),
             "Hello-2".getBytes(StandardCharsets.UTF_8),
@@ -200,7 +199,7 @@ public class ConsumerPoolTest {
 
         when(consumer.poll(anyLong())).thenThrow(new KafkaException("oops"));
         try (final ConsumerLease lease = testPool.obtainConsumer(mockSession, mockContext)) {
-            assertThrows(KafkaException.class, () -> lease.poll());
+            assertThrows(KafkaException.class, lease::poll);
         }
         testPool.close();
         verify(mockSession, times(0)).create();
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
index a7eb913344..a5f33db041 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka.java
@@ -42,7 +42,7 @@ public class ITConsumeKafka {
     }
 
     @Test
-    public void validateGetAllMessages() throws Exception {
+    public void validateGetAllMessages() {
         String groupName = "validateGetAllMessages";
 
         when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
@@ -56,7 +56,7 @@ public class ITConsumeKafka {
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_1_0.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
@@ -72,7 +72,7 @@ public class ITConsumeKafka {
     }
 
     @Test
-    public void validateGetAllMessagesPattern() throws Exception {
+    public void validateGetAllMessagesPattern() {
         String groupName = "validateGetAllMessagesPattern";
 
         when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
@@ -86,7 +86,7 @@ public class ITConsumeKafka {
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_1_0.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka_1_0.TOPICS, "(fo.*)|(ba)");
         runner.setProperty(ConsumeKafka_1_0.TOPIC_TYPE, "pattern");
         runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName);
@@ -103,7 +103,7 @@ public class ITConsumeKafka {
     }
 
     @Test
-    public void validateGetErrorMessages() throws Exception {
+    public void validateGetErrorMessages() {
         String groupName = "validateGetErrorMessages";
 
         when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
@@ -117,7 +117,7 @@ public class ITConsumeKafka {
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_1_0.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka_1_0.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka_1_0.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka_1_0.AUTO_OFFSET_RESET, ConsumeKafka_1_0.OFFSET_EARLIEST);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java
index 439c2f3fa5..d2dae0f04c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_1_0.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.kafka.pubsub;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.kafka.shared.property.SecurityProtocol;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
@@ -58,7 +59,7 @@ public class TestConsumeKafkaRecord_1_0 {
         };
 
         runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafkaRecord_1_0.BOOTSTRAP_SERVERS, "okeydokey:1234");
 
         final String readerId = "record-reader";
         final MockRecordParser readerService = new MockRecordParser();
@@ -77,24 +78,18 @@ public class TestConsumeKafkaRecord_1_0 {
     }
 
     @Test
-    public void validateCustomValidatorSettings() throws Exception {
+    public void validateCustomValidatorSettings() {
         runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_1_0.OFFSET_EARLIEST);
         runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         runner.assertValid();
-        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
-        runner.assertNotValid();
-        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-        runner.assertValid();
         runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         runner.assertValid();
-        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        runner.assertNotValid();
     }
 
     @Test
-    public void validatePropertiesValidation() throws Exception {
+    public void validatePropertiesValidation() {
         runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_1_0.OFFSET_EARLIEST);
@@ -115,7 +110,7 @@ public class TestConsumeKafkaRecord_1_0 {
     }
 
     @Test
-    public void validateGetAllMessages() throws Exception {
+    public void validateGetAllMessages() {
         String groupName = "validateGetAllMessages";
 
         when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
@@ -137,7 +132,7 @@ public class TestConsumeKafkaRecord_1_0 {
     }
 
     @Test
-    public void validateGetAllMessagesPattern() throws Exception {
+    public void validateGetAllMessagesPattern() {
         String groupName = "validateGetAllMessagesPattern";
 
         when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
@@ -160,7 +155,7 @@ public class TestConsumeKafkaRecord_1_0 {
     }
 
     @Test
-    public void validateGetErrorMessages() throws Exception {
+    public void validateGetErrorMessages() {
         String groupName = "validateGetErrorMessages";
 
         when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
@@ -182,24 +177,24 @@ public class TestConsumeKafkaRecord_1_0 {
     }
 
     @Test
-    public void testJaasConfiguration() throws Exception {
+    public void testJaasConfiguration() {
         runner.setProperty(ConsumeKafkaRecord_1_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafkaRecord_1_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafkaRecord_1_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_1_0.OFFSET_EARLIEST);
 
-        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.setProperty(ConsumeKafkaRecord_1_0.SECURITY_PROTOCOL, SecurityProtocol.SASL_PLAINTEXT.name());
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
-        runner.assertValid();
+        runner.setProperty(ConsumeKafkaRecord_1_0.KERBEROS_SERVICE_NAME, "kafka");
+        runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+        runner.setProperty(ConsumeKafkaRecord_1_0.KERBEROS_PRINCIPAL, "nifi@APACHE.COM");
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+        runner.setProperty(ConsumeKafkaRecord_1_0.KERBEROS_KEYTAB, "not.A.File");
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+        runner.setProperty(ConsumeKafkaRecord_1_0.KERBEROS_KEYTAB, "src/test/resources/server.properties");
         runner.assertValid();
     }
 
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_1_0.java
index ae0da43069..eb42d34019 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_1_0.java
@@ -27,10 +27,10 @@ import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.kafka.shared.property.SecurityProtocol;
 import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils;
 import org.apache.nifi.record.sink.RecordSinkService;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -138,13 +138,13 @@ public class TestKafkaRecordSink_1_0 {
         when(context.getProperty(KafkaRecordSink_1_0.MESSAGE_HEADER_ENCODING)).thenReturn(charEncodingValue);
 
         final PropertyValue securityValue = Mockito.mock(StandardPropertyValue.class);
-        when(securityValue.getValue()).thenReturn(KafkaProcessorUtils.SEC_SASL_PLAINTEXT.getValue());
-        when(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL)).thenReturn(securityValue);
+        when(securityValue.getValue()).thenReturn(SecurityProtocol.PLAINTEXT.name());
+        when(context.getProperty(KafkaRecordSink_1_0.SECURITY_PROTOCOL)).thenReturn(securityValue);
 
         final PropertyValue jaasValue = Mockito.mock(StandardPropertyValue.class);
         when(jaasValue.evaluateAttributeExpressions()).thenReturn(jaasValue);
         when(jaasValue.getValue()).thenReturn(null);
-        when(context.getProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME)).thenReturn(jaasValue);
+        when(context.getProperty(KafkaRecordSink_1_0.KERBEROS_SERVICE_NAME)).thenReturn(jaasValue);
 
         Map<PropertyDescriptor, String> propertyMap = new HashMap<>();
         propertyMap.put(KafkaRecordSink_1_0.TOPIC, KafkaRecordSink_1_0.TOPIC.getName());
@@ -160,9 +160,9 @@ public class TestKafkaRecordSink_1_0 {
         MockRecordWriter writer = new MockRecordWriter(null, false);
         when(context.getProperty(RecordSinkService.RECORD_WRITER_FACTORY)).thenReturn(pValue);
         when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer);
-        when(context.getProperty(KafkaProcessorUtils.SSL_CONTEXT_SERVICE)).thenReturn(pValue);
+        when(context.getProperty(KafkaRecordSink_1_0.SSL_CONTEXT_SERVICE)).thenReturn(pValue);
         when(pValue.asControllerService(SSLContextService.class)).thenReturn(null);
-        when(context.getProperty(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE)).thenReturn(pValue);
+        when(context.getProperty(KafkaRecordSink_1_0.KERBEROS_CREDENTIALS_SERVICE)).thenReturn(pValue);
         when(pValue.asControllerService(KerberosCredentialsService.class)).thenReturn(null);
 
         final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(task, UUID.randomUUID().toString(), logger, stateManager);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/log4j.properties
deleted file mode 100644
index 57cd63f139..0000000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-log4j.rootCategory=INFO, stdout
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n
-
-#og4j.category.org.apache.nifi.processors.kafka=DEBUG
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/zookeeper.properties b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/zookeeper.properties
deleted file mode 100644
index f5c257efeb..0000000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/resources/zookeeper.properties
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# the directory where the snapshot is stored.
-dataDir=target/kafka-tmp/zookeeper
-# the port at which the clients will connect
-#clientPort=2181
-# disable the per-ip limit on the number of connections since this is a non-production config
-maxClientCnxns=0
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
index 116e483f27..644af77411 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/pom.xml
@@ -52,7 +52,19 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-kerberos-credentials-service-api</artifactId>
         </dependency>
-
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kerberos-user-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-kerberos</artifactId>
+            <version>1.19.0-SNAPSHOT</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-kafka-shared</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
index de1ffee237..e0e0cb5894 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_0.java
@@ -34,6 +34,13 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
+import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -49,7 +56,6 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -59,9 +65,6 @@ import java.util.regex.Pattern;
 
 import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
 import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
 @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.0 Consumer API. "
     + "The complementary NiFi processor for sending messages is PublishKafkaRecord_2_0. Please note that, at this time, the Processor assumes that "
@@ -74,9 +77,9 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
 @WritesAttributes({
     @WritesAttribute(attribute = "record.count", description = "The number of records received"),
     @WritesAttribute(attribute = "mime.type", description = "The MIME Type that is provided by the configured Record Writer"),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the records are from"),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic records are from")
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_PARTITION, description = "The partition of the topic the records are from"),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC, description = "The topic records are from")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
@@ -85,7 +88,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.",
         expressionLanguageScope = VARIABLE_REGISTRY)
 @SeeAlso({ConsumeKafka_2_0.class, PublishKafka_2_0.class, PublishKafkaRecord_2_0.class})
-public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
+public class ConsumeKafkaRecord_2_0 extends AbstractProcessor implements KafkaClientComponent {
 
     static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
     static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
@@ -223,11 +226,11 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
     static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
         .name("key-attribute-encoding")
         .displayName("Key Attribute Encoding")
-        .description("If the <Separate By Key> property is set to true, FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY +
+        .description("If the <Separate By Key> property is set to true, FlowFiles that are emitted have an attribute named '" + KafkaFlowFileAttribute.KAFKA_KEY +
             "'. This property dictates how the value of the attribute should be encoded.")
         .required(true)
-        .defaultValue(UTF8_ENCODING.getValue())
-        .allowableValues(UTF8_ENCODING, HEX_ENCODING, DO_NOT_ADD_KEY_AS_ATTRIBUTE)
+        .defaultValue(KeyEncoding.UTF8.getValue())
+        .allowableValues(KeyEncoding.class)
         .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
@@ -248,22 +251,22 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
 
     static {
         List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
+        descriptors.add(BOOTSTRAP_SERVERS);
         descriptors.add(TOPICS);
         descriptors.add(TOPIC_TYPE);
         descriptors.add(RECORD_READER);
         descriptors.add(RECORD_WRITER);
         descriptors.add(HONOR_TRANSACTIONS);
-        descriptors.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
-        descriptors.add(KafkaProcessorUtils.SASL_MECHANISM);
-        descriptors.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
-        descriptors.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
-        descriptors.add(KafkaProcessorUtils.USER_PRINCIPAL);
-        descriptors.add(KafkaProcessorUtils.USER_KEYTAB);
-        descriptors.add(KafkaProcessorUtils.USERNAME);
-        descriptors.add(KafkaProcessorUtils.PASSWORD);
-        descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
-        descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
+        descriptors.add(SECURITY_PROTOCOL);
+        descriptors.add(SASL_MECHANISM);
+        descriptors.add(KERBEROS_CREDENTIALS_SERVICE);
+        descriptors.add(KERBEROS_SERVICE_NAME);
+        descriptors.add(KERBEROS_PRINCIPAL);
+        descriptors.add(KERBEROS_KEYTAB);
+        descriptors.add(SASL_USERNAME);
+        descriptors.add(SASL_PASSWORD);
+        descriptors.add(TOKEN_AUTHENTICATION);
+        descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(GROUP_ID);
         descriptors.add(SEPARATE_BY_KEY);
         descriptors.add(KEY_ATTRIBUTE_ENCODING);
@@ -306,7 +309,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
         return new Builder()
                 .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
                 .name(propertyDescriptorName)
-                .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
+                .addValidator(new DynamicPropertyValidator(ConsumerConfig.class))
                 .dynamic(true)
                 .expressionLanguageSupported(VARIABLE_REGISTRY)
                 .build();
@@ -314,7 +317,7 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final Collection<ValidationResult> validationResults = KafkaProcessorUtils.validateCommonProperties(validationContext);
+        final Collection<ValidationResult> validationResults = new KafkaClientCustomValidationFunction().apply(validationContext);
 
         final ValidationResult consumerPartitionsResult = ConsumerPartitionsUtil.validateConsumePartitions(validationContext.getAllProperties());
         validationResults.add(consumerPartitionsResult);
@@ -368,16 +371,16 @@ public class ConsumeKafkaRecord_2_0 extends AbstractProcessor {
         final int maxLeases = context.getMaxConcurrentTasks();
         final long maxUncommittedTime = context.getProperty(MAX_UNCOMMITTED_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
 
-        final Map<String, Object> props = new HashMap<>();
-        KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        final KafkaPropertyProvider propertyProvider = new StandardKafkaPropertyProvider(ConsumerConfig.class);
+        final Map<String, Object> props = propertyProvider.getProperties(context);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE.toString());
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         final String topicListing = context.getProperty(ConsumeKafkaRecord_2_0.TOPICS).evaluateAttributeExpressions().getValue();
         final String topicType = context.getProperty(ConsumeKafkaRecord_2_0.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
         final List<String> topics = new ArrayList<>();
-        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
 
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
index 4a6fd1b171..e28852cdc9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_2_0.java
@@ -34,6 +34,13 @@ import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
+import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -48,7 +55,6 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -56,20 +62,17 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
-
 @CapabilityDescription("Consumes messages from Apache Kafka specifically built against the Kafka 2.0 Consumer API. "
     + "The complementary NiFi processor for sending messages is PublishKafka_2_0.")
 @Tags({"Kafka", "Get", "Ingest", "Ingress", "Topic", "PubSub", "Consume", "2.0"})
 @WritesAttributes({
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_COUNT, description = "The number of messages written if more than one"),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_KEY, description = "The key of message if present and if single message. "
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_COUNT, description = "The number of messages written if more than one"),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_KEY, description = "The key of message if present and if single message. "
             + "How the key is encoded depends on the value of the 'Key Attribute Encoding' property."),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
-    @WritesAttribute(attribute = KafkaProcessorUtils.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_OFFSET, description = "The offset of the message in the partition of the topic."),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TIMESTAMP, description = "The timestamp of the message in the partition of the topic."),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_PARTITION, description = "The partition of the topic the message or message bundle is from"),
+    @WritesAttribute(attribute = KafkaFlowFileAttribute.KAFKA_TOPIC, description = "The topic the message or message bundle is from")
 })
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
@@ -77,7 +80,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
         + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
         + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
         expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
-public class ConsumeKafka_2_0 extends AbstractProcessor {
+public class ConsumeKafka_2_0 extends AbstractProcessor implements KafkaClientComponent {
     static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
 
     static final AllowableValue OFFSET_LATEST = new AllowableValue("latest", "latest", "Automatically reset the offset to the latest offset");
@@ -128,10 +131,10 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
     static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
             .name("key-attribute-encoding")
             .displayName("Key Attribute Encoding")
-            .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+            .description("FlowFiles that are emitted have an attribute named '" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
             .required(true)
-            .defaultValue(UTF8_ENCODING.getValue())
-            .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+            .defaultValue(KeyEncoding.UTF8.getValue())
+            .allowableValues(KeyEncoding.class)
             .build();
 
     static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
@@ -236,7 +239,17 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
 
     static {
         List<PropertyDescriptor> descriptors = new ArrayList<>();
-        descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
+        descriptors.add(BOOTSTRAP_SERVERS);
+        descriptors.add(SECURITY_PROTOCOL);
+        descriptors.add(SASL_MECHANISM);
+        descriptors.add(KERBEROS_SERVICE_NAME);
+        descriptors.add(KERBEROS_CREDENTIALS_SERVICE);
+        descriptors.add(KERBEROS_PRINCIPAL);
+        descriptors.add(KERBEROS_KEYTAB);
+        descriptors.add(SASL_USERNAME);
+        descriptors.add(SASL_PASSWORD);
+        descriptors.add(TOKEN_AUTHENTICATION);
+        descriptors.add(SSL_CONTEXT_SERVICE);
         descriptors.add(TOPICS);
         descriptors.add(TOPIC_TYPE);
         descriptors.add(HONOR_TRANSACTIONS);
@@ -278,7 +291,7 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
         return new PropertyDescriptor.Builder()
                 .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
                 .name(propertyDescriptorName)
-                .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
+                .addValidator(new DynamicPropertyValidator(ConsumerConfig.class))
                 .dynamic(true)
                 .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
                 .build();
@@ -286,7 +299,7 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final Collection<ValidationResult> validationResults = KafkaProcessorUtils.validateCommonProperties(validationContext);
+        final Collection<ValidationResult> validationResults = new KafkaClientCustomValidationFunction().apply(validationContext);
 
         final ValidationResult consumerPartitionsResult = ConsumerPartitionsUtil.validateConsumePartitions(validationContext.getAllProperties());
         validationResults.add(consumerPartitionsResult);
@@ -342,9 +355,9 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
         final byte[] demarcator = context.getProperty(ConsumeKafka_2_0.MESSAGE_DEMARCATOR).isSet()
                 ? context.getProperty(ConsumeKafka_2_0.MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8)
                 : null;
-        final Map<String, Object> props = new HashMap<>();
-        KafkaProcessorUtils.buildCommonKafkaProperties(context, ConsumerConfig.class, props);
-        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        final KafkaPropertyProvider propertyProvider = new StandardKafkaPropertyProvider(ConsumerConfig.class);
+        final Map<String, Object> props = propertyProvider.getProperties(context);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.FALSE.toString());
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
 
@@ -352,8 +365,8 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
         final String topicType = context.getProperty(ConsumeKafka_2_0.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
         final List<String> topics = new ArrayList<>();
         final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
-        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         final boolean honorTransactions = context.getProperty(HONOR_TRANSACTIONS).asBoolean();
         final int commsTimeoutMillis = context.getProperty(COMMS_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
         props.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, commsTimeoutMillis);
@@ -388,7 +401,7 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
             return new ConsumerPool(maxLeases, demarcator, separateByKey, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol,
                 bootstrapServers, log, honorTransactions, charset, headerNamePattern, partitionsToConsume);
         } else {
-            getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
+            getLogger().error("Subscription type has an unknown value {}", topicType);
             return null;
         }
     }
@@ -413,12 +426,12 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
         if (!activeLeases.isEmpty()) {
             int count = 0;
             for (final ConsumerLease lease : activeLeases) {
-                getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", new Object[] {lease});
+                getLogger().info("Consumer {} has not finished after waiting 30 seconds; will attempt to wake-up the lease", lease);
                 lease.wakeup();
                 count++;
             }
 
-            getLogger().info("Woke up {} consumers", new Object[] {count});
+            getLogger().info("Woke up {} consumers", count);
         }
 
         activeLeases.clear();
@@ -450,7 +463,7 @@ public class ConsumeKafka_2_0 extends AbstractProcessor {
                 }
             } catch (final WakeupException we) {
                 getLogger().warn("Was interrupted while trying to communicate with Kafka with lease {}. "
-                    + "Will roll back session and discard any partially received data.", new Object[] {lease});
+                    + "Will roll back session and discard any partially received data.", lease);
             } catch (final KafkaException kex) {
                 getLogger().error("Exception while interacting with Kafka so will close the lease {} due to {}",
                         new Object[]{lease, kex}, kex);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index c87852b48f..e7f6459990 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -27,6 +27,9 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.header.Header;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.attribute.StandardTransitUriProvider;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
@@ -62,8 +65,6 @@ import java.util.stream.Collectors;
 
 import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0.REL_PARSE_FAILURE;
 import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_0.REL_SUCCESS;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
 /**
  * This class represents a lease to access a Kafka Consumer object. The lease is
@@ -147,7 +148,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
      */
     @Override
     public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
-        logger.debug("Rebalance Alert: Partitions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+        logger.debug("Rebalance Alert: Partitions '{}' revoked for lease '{}' with consumer '{}'", partitions, this, kafkaConsumer);
         //force a commit here.  Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition
         commit();
     }
@@ -161,7 +162,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
      */
     @Override
     public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
-        logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+        logger.debug("Rebalance Alert: Partitions '{}' assigned for lease '{}' with consumer '{}'", partitions, this, kafkaConsumer);
     }
 
     public List<TopicPartition> getAssignedPartitions() {
@@ -200,7 +201,6 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
      * higher performance than the other commitOffsets call as it allows the
      * kafka client to collect more data from Kafka before committing the
      * offsets.
-     *
      * if false then we didn't do anything and should probably yield if true
      * then we committed new data
      *
@@ -358,9 +358,9 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             return null;
         }
 
-        if (HEX_ENCODING.getValue().equals(encoding)) {
+        if (KeyEncoding.HEX.getValue().equals(encoding)) {
             return DatatypeConverter.printHexBinary(key);
-        } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+        } else if (KeyEncoding.UTF8.getValue().equals(encoding)) {
             return new String(key, StandardCharsets.UTF_8);
         } else {
             return null;  // won't happen because it is guaranteed by the Allowable Values
@@ -474,10 +474,10 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
     private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause, final String message) {
         // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
         final Map<String, String> attributes = getAttributes(consumerRecord);
-        attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
-        attributes.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
-        attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
-        attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
+        attributes.put(KafkaFlowFileAttribute.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+        attributes.put(KafkaFlowFileAttribute.KAFKA_TIMESTAMP, String.valueOf(consumerRecord.timestamp()));
+        attributes.put(KafkaFlowFileAttribute.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
+        attributes.put(KafkaFlowFileAttribute.KAFKA_TOPIC, consumerRecord.topic());
 
         FlowFile failureFlowFile = session.create();
 
@@ -487,7 +487,7 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
         }
         failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
 
-        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic());
+        final String transitUri = StandardTransitUriProvider.getTransitUri(securityProtocol, bootstrapServers, consumerRecord.topic());
         session.getProvenanceReporter().receive(failureFlowFile, transitUri);
 
         session.transfer(failureFlowFile, REL_PARSE_FAILURE);
@@ -594,7 +594,6 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                         }
                     } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
                         handleParseFailure(consumerRecord, session, e);
-                        continue;
                     }
                 }
             }
@@ -636,32 +635,32 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
 
     private void populateAttributes(final BundleTracker tracker) {
         final Map<String, String> kafkaAttrs = new HashMap<>();
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
+        kafkaAttrs.put(KafkaFlowFileAttribute.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+        kafkaAttrs.put(KafkaFlowFileAttribute.KAFKA_TIMESTAMP, String.valueOf(tracker.initialTimestamp));
 
         // If we have a kafka key, we will add it as an attribute only if
         // the FlowFile contains a single Record, or if the Records have been separated by Key,
         // because we then know that even though there are multiple Records, they all have the same key.
         if (tracker.key != null && (tracker.totalRecords == 1 || separateByKey)) {
-            if (!keyEncoding.equalsIgnoreCase(KafkaProcessorUtils.DO_NOT_ADD_KEY_AS_ATTRIBUTE.getValue())) {
-                kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+            if (!keyEncoding.equalsIgnoreCase(KeyEncoding.DO_NOT_ADD.getValue())) {
+                kafkaAttrs.put(KafkaFlowFileAttribute.KAFKA_KEY, tracker.key);
             }
         }
 
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
-        kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
+        kafkaAttrs.put(KafkaFlowFileAttribute.KAFKA_PARTITION, String.valueOf(tracker.partition));
+        kafkaAttrs.put(KafkaFlowFileAttribute.KAFKA_TOPIC, tracker.topic);
         if (tracker.totalRecords > 1) {
             // Add a record.count attribute to remain consistent with other record-oriented processors. If not
             // reading/writing records, then use "kafka.count" attribute.
             if (tracker.recordWriter == null) {
-                kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+                kafkaAttrs.put(KafkaFlowFileAttribute.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
             } else {
                 kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords));
             }
         }
         final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
         final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
-        final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
+        final String transitUri = StandardTransitUriProvider.getTransitUri(securityProtocol, bootstrapServers, tracker.topic);
         getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
         tracker.updateFlowFile(newFlowFile);
     }
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
deleted file mode 100644
index 5c378a5715..0000000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ /dev/null
@@ -1,557 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.kafka.pubsub;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.config.SslConfigs;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.nifi.components.AllowableValue;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
-import org.apache.nifi.components.resource.ResourceCardinality;
-import org.apache.nifi.components.resource.ResourceType;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.ssl.SSLContextService;
-import org.apache.nifi.util.FormatUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-import java.util.regex.Pattern;
-
-public final class KafkaProcessorUtils {
-    private static final String ALLOW_EXPLICIT_KEYTAB = "NIFI_ALLOW_EXPLICIT_KEYTAB";
-
-    final Logger logger = LoggerFactory.getLogger(this.getClass());
-
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
-            "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
-    static final AllowableValue DO_NOT_ADD_KEY_AS_ATTRIBUTE = new AllowableValue("do-not-add", "Do Not Add Key as Attribute",
-        "The key will not be added as an Attribute");
-
-    static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
-
-    static final String KAFKA_KEY = "kafka.key";
-    static final String KAFKA_TOPIC = "kafka.topic";
-    static final String KAFKA_PARTITION = "kafka.partition";
-    static final String KAFKA_OFFSET = "kafka.offset";
-    static final String KAFKA_TIMESTAMP = "kafka.timestamp";
-    static final String KAFKA_COUNT = "kafka.count";
-
-    static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
-    static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
-    public static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
-    public static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL");
-
-    static final String GSSAPI_VALUE = "GSSAPI";
-    static final AllowableValue SASL_MECHANISM_GSSAPI = new AllowableValue(GSSAPI_VALUE, GSSAPI_VALUE,
-            "The mechanism for authentication via Kerberos. The principal and keytab must be provided to the processor " +
-                    "by using a Keytab Credential service, or by specifying the properties directly in the processor.");
-
-    static final String PLAIN_VALUE = "PLAIN";
-    static final AllowableValue SASL_MECHANISM_PLAIN = new AllowableValue(PLAIN_VALUE, PLAIN_VALUE,
-            "The mechanism for authentication via username and password. The username and password properties must " +
-                    "be populated when using this mechanism.");
-
-    static final String SCRAM_SHA256_VALUE = "SCRAM-SHA-256";
-    static final AllowableValue SASL_MECHANISM_SCRAM_SHA256 = new AllowableValue(SCRAM_SHA256_VALUE, SCRAM_SHA256_VALUE,"The Salted Challenge Response Authentication Mechanism using SHA-256. " +
-            "The username and password properties must be set when using this mechanism.");
-
-    static final String SCRAM_SHA512_VALUE = "SCRAM-SHA-512";
-    static final AllowableValue SASL_MECHANISM_SCRAM_SHA512 = new AllowableValue(SCRAM_SHA512_VALUE, SCRAM_SHA512_VALUE,"The Salted Challenge Response Authentication Mechanism using SHA-512. " +
-            "The username and password properties must be set when using this mechanism.");
-
-    static final AllowableValue FAILURE_STRATEGY_FAILURE_RELATIONSHIP = new AllowableValue("Route to Failure", "Route to Failure",
-        "When unable to publish a FlowFile to Kafka, the FlowFile will be routed to the 'failure' relationship.");
-    static final AllowableValue FAILURE_STRATEGY_ROLLBACK = new AllowableValue("Rollback", "Rollback",
-        "When unable to publish a FlowFile to Kafka, the FlowFile will be placed back on the top of its queue so that it will be the next FlowFile tried again. " +
-            "For dataflows where ordering of FlowFiles is important, this strategy can be used along with ensuring that the each processor in the dataflow uses only a single Concurrent Task.");
-
-    public static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
-            .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
-            .displayName("Kafka Brokers")
-            .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
-            .required(true)
-            .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .defaultValue("localhost:9092")
-            .build();
-    public static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
-            .name("security.protocol")
-            .displayName("Security Protocol")
-            .description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL)
-            .defaultValue(SEC_PLAINTEXT.getValue())
-            .build();
-    static final PropertyDescriptor SASL_MECHANISM = new PropertyDescriptor.Builder()
-            .name("sasl.mechanism")
-            .displayName("SASL Mechanism")
-            .description("The SASL mechanism to use for authentication. Corresponds to Kafka's 'sasl.mechanism' property.")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .allowableValues(SASL_MECHANISM_GSSAPI, SASL_MECHANISM_PLAIN, SASL_MECHANISM_SCRAM_SHA256, SASL_MECHANISM_SCRAM_SHA512)
-            .defaultValue(GSSAPI_VALUE)
-            .build();
-    public static final PropertyDescriptor JAAS_SERVICE_NAME = new PropertyDescriptor.Builder()
-            .name("sasl.kerberos.service.name")
-            .displayName("Kerberos Service Name")
-            .description("The service name that matches the primary name of the Kafka server configured in the broker JAAS file."
-                    + "This can be defined either in Kafka's JAAS config or in Kafka's config. "
-                    + "Corresponds to Kafka's 'security.protocol' property."
-                    + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.")
-            .required(false)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-    static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
-            .name("sasl.kerberos.principal")
-            .displayName("Kerberos Principal")
-            .description("The Kerberos principal that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
-                    + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
-            .required(false)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-    static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
-            .name("sasl.kerberos.keytab")
-            .displayName("Kerberos Keytab")
-            .description("The Kerberos keytab that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
-                    + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
-            .required(false)
-            .identifiesExternalResource(ResourceCardinality.SINGLE, ResourceType.FILE)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-    static final PropertyDescriptor USERNAME = new PropertyDescriptor.Builder()
-            .name("sasl.username")
-            .displayName("Username")
-            .description("The username when the SASL Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE)
-            .required(false)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-    static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
-            .name("sasl.password")
-            .displayName("Password")
-            .description("The password for the given username when the SASL Mechanism is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE)
-            .required(false)
-            .sensitive(true)
-            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .build();
-    static final PropertyDescriptor TOKEN_AUTH = new PropertyDescriptor.Builder()
-            .name("sasl.token.auth")
-            .displayName("Token Auth")
-            .description("When " + SASL_MECHANISM.getDisplayName() + " is " + SCRAM_SHA256_VALUE + " or " + SCRAM_SHA512_VALUE
-                    + ", this property indicates if token authentication should be used.")
-            .required(false)
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .build();
-    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
-            .name("ssl.context.service")
-            .displayName("SSL Context Service")
-            .description("Specifies the SSL Context Service to use for communicating with Kafka.")
-            .required(false)
-            .identifiesControllerService(SSLContextService.class)
-            .build();
-    public static final PropertyDescriptor KERBEROS_CREDENTIALS_SERVICE = new PropertyDescriptor.Builder()
-        .name("kerberos-credentials-service")
-        .displayName("Kerberos Credentials Service")
-        .description("Specifies the Kerberos Credentials Controller Service that should be used for authenticating with Kerberos")
-        .identifiesControllerService(KerberosCredentialsService.class)
-        .required(false)
-        .build();
-
-    static final PropertyDescriptor FAILURE_STRATEGY = new PropertyDescriptor.Builder()
-        .name("Failure Strategy")
-        .displayName("Failure Strategy")
-        .description("Dictates how the processor handles a FlowFile if it is unable to publish the data to Kafka")
-        .required(true)
-        .allowableValues(FAILURE_STRATEGY_FAILURE_RELATIONSHIP, FAILURE_STRATEGY_ROLLBACK)
-        .defaultValue(FAILURE_STRATEGY_FAILURE_RELATIONSHIP.getValue())
-        .build();
-
-    static List<PropertyDescriptor> getCommonPropertyDescriptors() {
-        return Arrays.asList(
-                BOOTSTRAP_SERVERS,
-                SECURITY_PROTOCOL,
-                SASL_MECHANISM,
-                JAAS_SERVICE_NAME,
-                KERBEROS_CREDENTIALS_SERVICE,
-                USER_PRINCIPAL,
-                USER_KEYTAB,
-                USERNAME,
-                PASSWORD,
-                TOKEN_AUTH,
-                SSL_CONTEXT_SERVICE
-        );
-    }
-
-    public static Collection<ValidationResult> validateCommonProperties(final ValidationContext validationContext) {
-        List<ValidationResult> results = new ArrayList<>();
-
-        final String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
-        final String saslMechanism = validationContext.getProperty(SASL_MECHANISM).getValue();
-
-        final String explicitPrincipal = validationContext.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
-        final String explicitKeytab = validationContext.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
-        final KerberosCredentialsService credentialsService = validationContext.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
-
-        final String resolvedPrincipal;
-        final String resolvedKeytab;
-        if (credentialsService == null) {
-            resolvedPrincipal = explicitPrincipal;
-            resolvedKeytab = explicitKeytab;
-        } else {
-            resolvedPrincipal = credentialsService.getPrincipal();
-            resolvedKeytab = credentialsService.getKeytab();
-        }
-
-        if (credentialsService != null && (explicitPrincipal != null || explicitKeytab != null)) {
-            results.add(new ValidationResult.Builder()
-                .subject("Kerberos Credentials")
-                .valid(false)
-                .explanation("Cannot specify both a Kerberos Credentials Service and a principal/keytab")
-                .build());
-        }
-
-        final String allowExplicitKeytabVariable = System.getenv(ALLOW_EXPLICIT_KEYTAB);
-        if ("false".equalsIgnoreCase(allowExplicitKeytabVariable) && (explicitPrincipal != null || explicitKeytab != null)) {
-            results.add(new ValidationResult.Builder()
-                .subject("Kerberos Credentials")
-                .valid(false)
-                .explanation("The '" + ALLOW_EXPLICIT_KEYTAB + "' system environment variable is configured to forbid explicitly configuring principal/keytab in processors. "
-                    + "The Kerberos Credentials Service should be used instead of setting the Kerberos Keytab or Kerberos Principal property.")
-                .build());
-        }
-
-        // validates that if the SASL mechanism is GSSAPI (kerberos) AND one of the SASL options is selected
-        // for security protocol, then Kerberos principal is provided as well
-        if (SASL_MECHANISM_GSSAPI.getValue().equals(saslMechanism)
-                && (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol))) {
-            String jaasServiceName = validationContext.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
-            if (jaasServiceName == null || jaasServiceName.trim().length() == 0) {
-                results.add(new ValidationResult.Builder().subject(JAAS_SERVICE_NAME.getDisplayName()).valid(false)
-                    .explanation("The <" + JAAS_SERVICE_NAME.getDisplayName() + "> property must be set when <"
-                        + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '"
-                        + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
-                    .build());
-            }
-
-            if ((resolvedKeytab == null && resolvedPrincipal != null) || (resolvedKeytab != null && resolvedPrincipal == null)) {
-                results.add(new ValidationResult.Builder()
-                    .subject(JAAS_SERVICE_NAME.getDisplayName())
-                    .valid(false)
-                    .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
-                        + "must be set or neither must be set.")
-                    .build());
-            }
-        }
-
-        // validate that if SASL Mechanism is PLAIN or SCRAM, then username and password are both provided
-        if (SASL_MECHANISM_PLAIN.getValue().equals(saslMechanism)
-                || SASL_MECHANISM_SCRAM_SHA256.getValue().equals(saslMechanism)
-                || SASL_MECHANISM_SCRAM_SHA512.getValue().equals(saslMechanism)) {
-            final String username = validationContext.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
-            if (StringUtils.isBlank(username)) {
-                results.add(new ValidationResult.Builder()
-                        .subject(USERNAME.getDisplayName())
-                        .valid(false)
-                        .explanation("A username is required when " + SASL_MECHANISM.getDisplayName()
-                                + " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE)
-                        .build());
-            }
-
-            final String password = validationContext.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
-            if (StringUtils.isBlank(password)) {
-                results.add(new ValidationResult.Builder()
-                        .subject(PASSWORD.getDisplayName())
-                        .valid(false)
-                        .explanation("A password is required when " + SASL_MECHANISM.getDisplayName()
-                                + " is " + PLAIN_VALUE + " or " + SCRAM_SHA256_VALUE + "/" + SCRAM_SHA512_VALUE)
-                        .build());
-            }
-        }
-
-        // If SSL or SASL_SSL then SSLContext Controller Service must be set.
-        final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol);
-        final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
-        if (csSet && !sslProtocol) {
-            results.add(new ValidationResult.Builder()
-                .subject(SECURITY_PROTOCOL.getDisplayName())
-                .valid(false)
-                .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.")
-                .build());
-        }
-
-        if (!csSet && sslProtocol) {
-            results.add(new ValidationResult.Builder()
-                .subject(SSL_CONTEXT_SERVICE.getDisplayName())
-                .valid(false)
-                .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service")
-                .build());
-        }
-
-        final String enableAutoCommit = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue();
-        if (enableAutoCommit != null && !enableAutoCommit.toLowerCase().equals("false")) {
-            results.add(new ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
-                .explanation("Enable auto commit must be false. It is managed by the processor.").build());
-        }
-
-        final String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue();
-        if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) {
-            results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
-                .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
-        }
-
-        final String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue();
-        if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) {
-            results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
-                .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build());
-        }
-
-        final String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue();
-        if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) {
-            results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)
-                .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build());
-        }
-
-        final String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue();
-        if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) {
-            results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
-                .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build());
-        }
-
-        return results;
-    }
-
-    public static final class KafkaConfigValidator implements Validator {
-
-        final Class<?> classType;
-
-        public KafkaConfigValidator(final Class<?> classType) {
-            this.classType = classType;
-        }
-
-        @Override
-        public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
-            if (subject.startsWith(ConsumerPartitionsUtil.PARTITION_PROPERTY_NAME_PREFIX)) {
-                return new ValidationResult.Builder().valid(true).build();
-            }
-
-            final boolean knownValue = KafkaProcessorUtils.isStaticStringFieldNamePresent(subject, classType, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class);
-            return new ValidationResult.Builder().subject(subject).explanation("Must be a known configuration parameter for this kafka client").valid(knownValue).build();
-        }
-    }
-
-    /**
-     * Builds transit URI for provenance event. The transit URI will be in the
-     * form of &lt;security.protocol&gt;://&lt;bootstrap.servers&gt;/topic
-     */
-    static String buildTransitURI(String securityProtocol, String brokers, String topic) {
-        StringBuilder builder = new StringBuilder();
-        builder.append(securityProtocol);
-        builder.append("://");
-        builder.append(brokers);
-        builder.append("/");
-        builder.append(topic);
-        return builder.toString();
-    }
-
-
-    static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
-        for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
-            if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
-                // Translate SSLContext Service configuration into Kafka properties
-                final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-                if (sslContextService != null && sslContextService.isKeyStoreConfigured()) {
-                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile());
-                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword());
-                    final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
-                    mapToPopulate.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPass);
-                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, sslContextService.getKeyStoreType());
-                }
-
-                if (sslContextService != null && sslContextService.isTrustStoreConfigured()) {
-                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslContextService.getTrustStoreFile());
-                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslContextService.getTrustStorePassword());
-                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
-                }
-            }
-
-            String propertyName = propertyDescriptor.getName();
-            String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
-                    ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
-                    : context.getProperty(propertyDescriptor).getValue();
-
-            if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())
-                && !propertyName.startsWith(ConsumerPartitionsUtil.PARTITION_PROPERTY_NAME_PREFIX)) {
-
-                // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
-                // or the standard NiFi time period such as "5 secs"
-                if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
-                    propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
-                }
-
-                if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
-                    mapToPopulate.put(propertyName, propertyValue);
-                }
-            }
-        }
-
-        String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
-        if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
-            setJaasConfig(mapToPopulate, context);
-        }
-    }
-
-    /**
-     * Method used to create a transactional id Supplier for KafkaProducer
-     *
-     * @param prefix String transactional id prefix, can be null
-     * @return A Supplier that generates transactional id
-     */
-    static Supplier<String> getTransactionalIdSupplier(String prefix) {
-        return () -> (prefix == null ? "" : prefix)  + UUID.randomUUID().toString();
-    }
-
-    /**
-     * Method used to configure the 'sasl.jaas.config' property based on KAFKA-4259<br />
-     * https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients<br />
-     * <br />
-     * It expects something with the following format: <br />
-     * <br />
-     * &lt;LoginModuleClass&gt; &lt;ControlFlag&gt; *(&lt;OptionName&gt;=&lt;OptionValue&gt;); <br />
-     * ControlFlag = required / requisite / sufficient / optional
-     *
-     * @param mapToPopulate Map of configuration properties
-     * @param context Context
-     */
-    private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
-        final String saslMechanism = context.getProperty(SASL_MECHANISM).getValue();
-        switch (saslMechanism) {
-            case GSSAPI_VALUE:
-                setGssApiJaasConfig(mapToPopulate, context);
-                break;
-            case PLAIN_VALUE:
-                setPlainJaasConfig(mapToPopulate, context);
-                break;
-            case SCRAM_SHA256_VALUE:
-            case SCRAM_SHA512_VALUE:
-                setScramJaasConfig(mapToPopulate, context);
-                break;
-            default:
-                throw new IllegalStateException("Unknown " + SASL_MECHANISM.getDisplayName() + ": " + saslMechanism);
-        }
-    }
-
-    private static void setGssApiJaasConfig(final Map<String, Object> mapToPopulate, final ProcessContext context) {
-        String keytab = context.getProperty(USER_KEYTAB).evaluateAttributeExpressions().getValue();
-        String principal = context.getProperty(USER_PRINCIPAL).evaluateAttributeExpressions().getValue();
-
-        // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
-        // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
-        final KerberosCredentialsService credentialsService = context.getProperty(KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
-        if (credentialsService != null) {
-            principal = credentialsService.getPrincipal();
-            keytab = credentialsService.getKeytab();
-        }
-
-
-        String serviceName = context.getProperty(JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
-        if (StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
-            mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
-                    + "useTicketCache=false "
-                    + "renewTicket=true "
-                    + "serviceName=\"" + serviceName + "\" "
-                    + "useKeyTab=true "
-                    + "keyTab=\"" + keytab + "\" "
-                    + "principal=\"" + principal + "\";");
-        }
-    }
-
-    private static void setPlainJaasConfig(final Map<String, Object> mapToPopulate, final ProcessContext context) {
-        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
-        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
-
-        mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required "
-                + "username=\"" + username + "\" "
-                + "password=\"" + password + "\";");
-    }
-
-    private static void setScramJaasConfig(final Map<String, Object> mapToPopulate, final ProcessContext context) {
-        final String username = context.getProperty(USERNAME).evaluateAttributeExpressions().getValue();
-        final String password = context.getProperty(PASSWORD).evaluateAttributeExpressions().getValue();
-
-        final StringBuilder builder = new StringBuilder("org.apache.kafka.common.security.scram.ScramLoginModule required ")
-                .append("username=\"" + username + "\" ")
-                .append("password=\"" + password + "\"");
-
-        final Boolean tokenAuth = context.getProperty(TOKEN_AUTH).asBoolean();
-        if (tokenAuth != null && tokenAuth) {
-            builder.append(" tokenauth=\"true\"");
-        }
-
-        builder.append(";");
-        mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, builder.toString());
-    }
-
-    public static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
-        return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
-    }
-
-    private static Set<String> getPublicStaticStringFieldValues(final Class<?>... classes) {
-        final Set<String> strings = new HashSet<>();
-        for (final Class<?> classType : classes) {
-            for (final Field field : classType.getDeclaredFields()) {
-                if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) {
-                    try {
-                        strings.add(String.valueOf(field.get(null)));
-                    } catch (IllegalArgumentException | IllegalAccessException ex) {
-                        //ignore
-                    }
-                }
-            }
-        }
-        return strings;
-    }
-
-}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
index f659bb456c..b4f398464c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_2_0.java
@@ -35,13 +35,20 @@ import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.shared.attribute.StandardTransitUriProvider;
+import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
+import org.apache.nifi.kafka.shared.property.FailureStrategy;
+import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
+import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.FlowFileFilters;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.record.path.RecordPath;
@@ -57,13 +64,10 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -79,8 +83,6 @@ import java.util.regex.Pattern;
 import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
 import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
 import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK;
 
 @Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "2.0"})
 @CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 2.0 Producer API. "
@@ -95,7 +97,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILUR
 @WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
     + "FlowFiles that are routed to success.")
 @SeeAlso({PublishKafka_2_0.class, ConsumeKafka_2_0.class, ConsumeKafkaRecord_2_0.class})
-public class PublishKafkaRecord_2_0 extends AbstractProcessor {
+public class PublishKafkaRecord_2_0 extends AbstractProcessor implements KafkaPublishComponent {
     protected static final String MSG_COUNT = "msg.count";
 
     static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
@@ -286,26 +288,26 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
 
     static {
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
+        properties.add(BOOTSTRAP_SERVERS);
         properties.add(TOPIC);
         properties.add(RECORD_READER);
         properties.add(RECORD_WRITER);
         properties.add(USE_TRANSACTIONS);
-        properties.add(KafkaProcessorUtils.FAILURE_STRATEGY);
+        properties.add(FAILURE_STRATEGY);
         properties.add(TRANSACTIONAL_ID_PREFIX);
         properties.add(DELIVERY_GUARANTEE);
         properties.add(ATTRIBUTE_NAME_REGEX);
         properties.add(MESSAGE_HEADER_ENCODING);
-        properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
-        properties.add(KafkaProcessorUtils.SASL_MECHANISM);
-        properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
-        properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
-        properties.add(KafkaProcessorUtils.USER_PRINCIPAL);
-        properties.add(KafkaProcessorUtils.USER_KEYTAB);
-        properties.add(KafkaProcessorUtils.USERNAME);
-        properties.add(KafkaProcessorUtils.PASSWORD);
-        properties.add(KafkaProcessorUtils.TOKEN_AUTH);
-        properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
+        properties.add(SECURITY_PROTOCOL);
+        properties.add(SASL_MECHANISM);
+        properties.add(KERBEROS_CREDENTIALS_SERVICE);
+        properties.add(KERBEROS_SERVICE_NAME);
+        properties.add(KERBEROS_PRINCIPAL);
+        properties.add(KERBEROS_KEYTAB);
+        properties.add(SASL_USERNAME);
+        properties.add(SASL_PASSWORD);
+        properties.add(TOKEN_AUTHENTICATION);
+        properties.add(SSL_CONTEXT_SERVICE);
         properties.add(MESSAGE_KEY_FIELD);
         properties.add(MAX_REQUEST_SIZE);
         properties.add(ACK_WAIT_TIME);
@@ -337,7 +339,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
         return new Builder()
             .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
             .name(propertyDescriptorName)
-            .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+            .addValidator(new DynamicPropertyValidator(ProducerConfig.class))
             .dynamic(true)
             .expressionLanguageSupported(VARIABLE_REGISTRY)
             .build();
@@ -345,8 +347,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
-        results.addAll(KafkaProcessorUtils.validateCommonProperties(validationContext));
+        final List<ValidationResult> results = new ArrayList<>(new KafkaClientCustomValidationFunction().apply(validationContext));
 
         final boolean useTransactions = validationContext.getProperty(USE_TRANSACTIONS).asBoolean();
         if (useTransactions) {
@@ -401,19 +402,19 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
 
     protected PublisherPool createPublisherPool(final ProcessContext context) {
         final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
-        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
 
         final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
         final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
         final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
         final String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue();
-        Supplier<String> transactionalIdSupplier = KafkaProcessorUtils.getTransactionalIdSupplier(transactionalIdPrefix);
+        Supplier<String> transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix);
 
         final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
         final Charset charset = Charset.forName(charsetName);
 
-        final Map<String, Object> kafkaProperties = new HashMap<>();
-        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+        final KafkaPropertyProvider propertyProvider = new StandardKafkaPropertyProvider(ProducerConfig.class);
+        final Map<String, Object> kafkaProperties = propertyProvider.getProperties(context);
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
@@ -443,8 +444,8 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
             return;
         }
 
-        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
@@ -481,24 +482,20 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
                     final Function<Record, Integer> partitioner = getPartitioner(context, flowFile);
 
                     try {
-                        session.read(flowFile, new InputStreamCallback() {
-                            @Override
-                            public void process(final InputStream in) throws IOException {
-                                try {
-                                    final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
-                                    final RecordSet recordSet = reader.createRecordSet();
-
-                                    final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
-                                    lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
-                                } catch (final SchemaNotFoundException | MalformedRecordException e) {
-                                    throw new ProcessException(e);
-                                }
+                        session.read(flowFile, in -> {
+                            try {
+                                final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+                                final RecordSet recordSet = reader.createRecordSet();
+
+                                final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
+                                lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic, partitioner);
+                            } catch (final SchemaNotFoundException | MalformedRecordException e) {
+                                throw new ProcessException(e);
                             }
                         });
                     } catch (final Exception e) {
                         // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles()
                         lease.fail(flowFile, e);
-                        continue;
                     }
                 }
 
@@ -520,7 +517,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
                     success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
                     session.adjustCounter("Messages Sent", msgCount, true);
 
-                    final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+                    final String transitUri = StandardTransitUriProvider.getTransitUri(securityProtocol, bootstrapServers, topic);
                     session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
                     session.transfer(success, REL_SUCCESS);
                 }
@@ -564,7 +561,7 @@ public class PublishKafkaRecord_2_0 extends AbstractProcessor {
 
     private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
         final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
-        if (FAILURE_STRATEGY_ROLLBACK.getValue().equals(strategy)) {
+        if (FailureStrategy.ROLLBACK.getValue().equals(strategy)) {
             return (session, flowFiles) -> session.rollback();
         } else {
             return (session, flowFiles) -> session.transfer(flowFiles, REL_FAILURE);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
index f3260c1f26..9bc312fc9e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_2_0.java
@@ -34,26 +34,33 @@ import org.apache.nifi.components.PropertyDescriptor.Builder;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute;
+import org.apache.nifi.kafka.shared.attribute.StandardTransitUriProvider;
+import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
+import org.apache.nifi.kafka.shared.property.FailureStrategy;
+import org.apache.nifi.kafka.shared.property.KeyEncoding;
+import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
+import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
+import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.FlowFileFilters;
 import org.apache.nifi.processor.util.StandardValidators;
 
 import javax.xml.bind.DatatypeConverter;
 import java.io.BufferedInputStream;
-import java.io.IOException;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -66,8 +73,6 @@ import java.util.regex.Pattern;
 import static org.apache.nifi.expression.ExpressionLanguageScope.FLOWFILE_ATTRIBUTES;
 import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
 import static org.apache.nifi.expression.ExpressionLanguageScope.VARIABLE_REGISTRY;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY;
-import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK;
 
 @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "2.0"})
 @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 2.0 Producer API."
@@ -83,7 +88,7 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.FAILUR
 @WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
     + "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
     + "be greater than 1.")
-public class PublishKafka_2_0 extends AbstractProcessor {
+public class PublishKafka_2_0 extends AbstractProcessor implements KafkaPublishComponent {
     protected static final String MSG_COUNT = "msg.count";
 
     static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
@@ -107,10 +112,6 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         "Interprets the <Partition> property as Expression Language that will be evaluated against each FlowFile. This Expression will be evaluated once against the FlowFile, " +
             "so all Records in a given FlowFile will go to the same partition.");
 
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
-        "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
-
     static final PropertyDescriptor TOPIC = new Builder()
         .name("topic")
         .displayName("Topic Name")
@@ -177,10 +178,10 @@ public class PublishKafka_2_0 extends AbstractProcessor {
     static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new Builder()
         .name("key-attribute-encoding")
         .displayName("Key Attribute Encoding")
-        .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+        .description("FlowFiles that are emitted have an attribute named '" + KafkaFlowFileAttribute.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
         .required(true)
-        .defaultValue(UTF8_ENCODING.getValue())
-        .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+        .defaultValue(KeyEncoding.UTF8.getValue())
+        .allowableValues(KeyEncoding.class)
         .build();
 
     static final PropertyDescriptor MESSAGE_DEMARCATOR = new Builder()
@@ -280,7 +281,17 @@ public class PublishKafka_2_0 extends AbstractProcessor {
 
     static {
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
+        properties.add(BOOTSTRAP_SERVERS);
+        properties.add(SECURITY_PROTOCOL);
+        properties.add(SASL_MECHANISM);
+        properties.add(KERBEROS_SERVICE_NAME);
+        properties.add(KERBEROS_CREDENTIALS_SERVICE);
+        properties.add(KERBEROS_PRINCIPAL);
+        properties.add(KERBEROS_KEYTAB);
+        properties.add(SASL_USERNAME);
+        properties.add(SASL_PASSWORD);
+        properties.add(TOKEN_AUTHENTICATION);
+        properties.add(SSL_CONTEXT_SERVICE);
         properties.add(TOPIC);
         properties.add(DELIVERY_GUARANTEE);
         properties.add(FAILURE_STRATEGY);
@@ -321,7 +332,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         return new Builder()
             .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
             .name(propertyDescriptorName)
-            .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+            .addValidator(new DynamicPropertyValidator(ProducerConfig.class))
             .dynamic(true)
             .expressionLanguageSupported(VARIABLE_REGISTRY)
             .build();
@@ -329,8 +340,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
-        results.addAll(KafkaProcessorUtils.validateCommonProperties(validationContext));
+        final List<ValidationResult> results = new ArrayList<>(new KafkaClientCustomValidationFunction().apply(validationContext));
 
         final boolean useTransactions = validationContext.getProperty(USE_TRANSACTIONS).asBoolean();
         if (useTransactions) {
@@ -371,19 +381,19 @@ public class PublishKafka_2_0 extends AbstractProcessor {
 
     protected PublisherPool createPublisherPool(final ProcessContext context) {
         final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
-        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+        final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
 
         final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
         final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
         final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
         final String transactionalIdPrefix = context.getProperty(TRANSACTIONAL_ID_PREFIX).evaluateAttributeExpressions().getValue();
-        Supplier<String> transactionalIdSupplier = KafkaProcessorUtils.getTransactionalIdSupplier(transactionalIdPrefix);
+        Supplier<String> transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix);
 
         final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
         final Charset charset = Charset.forName(charsetName);
 
-        final Map<String, Object> kafkaProperties = new HashMap<>();
-        KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+        final KafkaPropertyProvider propertyProvider = new StandardKafkaPropertyProvider(ProducerConfig.class);
+        final Map<String, Object> kafkaProperties = propertyProvider.getProperties(context);
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
@@ -415,8 +425,8 @@ public class PublishKafka_2_0 extends AbstractProcessor {
             return;
         }
 
-        final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+        final String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+        final String bootstrapServers = context.getProperty(BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
         final PublishFailureStrategy failureStrategy = getFailureStrategy(context);
 
@@ -451,12 +461,9 @@ public class PublishKafka_2_0 extends AbstractProcessor {
                     }
 
                     final Integer partition = getPartition(context, flowFile);
-                    session.read(flowFile, new InputStreamCallback() {
-                        @Override
-                        public void process(final InputStream rawIn) throws IOException {
-                            try (final InputStream in = new BufferedInputStream(rawIn)) {
-                                lease.publish(flowFile, in, messageKey, demarcatorBytes, topic, partition);
-                            }
+                    session.read(flowFile, rawIn -> {
+                        try (final InputStream in = new BufferedInputStream(rawIn)) {
+                            lease.publish(flowFile, in, messageKey, demarcatorBytes, topic, partition);
                         }
                     });
                 }
@@ -479,7 +486,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
                     success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
                     session.adjustCounter("Messages Sent", msgCount, true);
 
-                    final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+                    final String transitUri = StandardTransitUriProvider.getTransitUri(securityProtocol, bootstrapServers, topic);
                     session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
                     session.transfer(success, REL_SUCCESS);
                 }
@@ -494,7 +501,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
 
     private PublishFailureStrategy getFailureStrategy(final ProcessContext context) {
         final String strategy = context.getProperty(FAILURE_STRATEGY).getValue();
-        if (FAILURE_STRATEGY_ROLLBACK.getValue().equals(strategy)) {
+        if (FailureStrategy.ROLLBACK.getValue().equals(strategy)) {
             return (session, flowFiles) -> session.rollback();
         } else {
             return (session, flowFiles) -> session.transfer(flowFiles, REL_FAILURE);
@@ -507,7 +514,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         if (context.getProperty(KEY).isSet()) {
             uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
         } else {
-            uninterpretedKey = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY);
+            uninterpretedKey = flowFile.getAttribute(KafkaFlowFileAttribute.KAFKA_KEY);
         }
 
         if (uninterpretedKey == null) {
@@ -515,7 +522,7 @@ public class PublishKafka_2_0 extends AbstractProcessor {
         }
 
         final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
-        if (UTF8_ENCODING.getValue().equals(keyEncoding)) {
+        if (KeyEncoding.UTF8.getValue().equals(keyEncoding)) {
             return uninterpretedKey.getBytes(StandardCharsets.UTF_8);
         }
 
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_0.java
index dba0c61043..53f010ced8 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/main/java/org/apache/nifi/record/sink/kafka/KafkaRecordSink_2_0.java
@@ -16,14 +16,10 @@
  */
 package org.apache.nifi.record.sink.kafka;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
@@ -39,10 +35,13 @@ import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.kerberos.KerberosCredentialsService;
+import org.apache.nifi.kafka.shared.component.KafkaPublishComponent;
+import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
+import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
+import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.util.StandardValidators;
-import org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils;
 import org.apache.nifi.record.sink.RecordSinkService;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.RecordSetWriter;
@@ -51,18 +50,14 @@ import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
-import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.exception.TokenTooLargeException;
-import org.apache.nifi.util.FormatUtils;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -76,7 +71,7 @@ import java.util.concurrent.TimeoutException;
                 + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
                 + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
         expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
-public class KafkaRecordSink_2_0 extends AbstractControllerService implements RecordSinkService {
+public class KafkaRecordSink_2_0 extends AbstractControllerService implements RecordSinkService, KafkaPublishComponent {
 
     static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
             "Records are considered 'transmitted unsuccessfully' unless the message is replicated to the appropriate "
@@ -89,10 +84,6 @@ public class KafkaRecordSink_2_0 extends AbstractControllerService implements Re
             "Records are considered 'transmitted successfully' after successfully writing the content to a Kafka node, "
                     + "without waiting for a response. This provides the best performance but may result in data loss.");
 
-    static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
-    static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
-            "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
-
     static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
             .name("topic")
             .displayName("Topic Name")
@@ -171,17 +162,18 @@ public class KafkaRecordSink_2_0 extends AbstractControllerService implements Re
     private volatile Producer<byte[], byte[]> producer;
 
     @Override
-    protected void init(final ControllerServiceInitializationContext context) throws InitializationException {
+    protected void init(final ControllerServiceInitializationContext context) {
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
+        properties.add(BOOTSTRAP_SERVERS);
         properties.add(TOPIC);
         properties.add(RecordSinkService.RECORD_WRITER_FACTORY);
         properties.add(DELIVERY_GUARANTEE);
         properties.add(MESSAGE_HEADER_ENCODING);
-        properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
-        properties.add(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE);
-        properties.add(KafkaProcessorUtils.JAAS_SERVICE_NAME);
-        properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
+        properties.add(SECURITY_PROTOCOL);
+        properties.add(SASL_MECHANISM);
+        properties.add(KERBEROS_CREDENTIALS_SERVICE);
+        properties.add(KERBEROS_SERVICE_NAME);
+        properties.add(SSL_CONTEXT_SERVICE);
         properties.add(MAX_REQUEST_SIZE);
         properties.add(ACK_WAIT_TIME);
         properties.add(METADATA_WAIT_TIME);
@@ -199,7 +191,7 @@ public class KafkaRecordSink_2_0 extends AbstractControllerService implements Re
         return new PropertyDescriptor.Builder()
                 .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
                 .name(propertyDescriptorName)
-                .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+                .addValidator(new DynamicPropertyValidator(ProducerConfig.class))
                 .dynamic(true)
                 .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
                 .build();
@@ -207,7 +199,7 @@ public class KafkaRecordSink_2_0 extends AbstractControllerService implements Re
 
     @Override
     protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        return KafkaProcessorUtils.validateCommonProperties(validationContext);
+        return new KafkaClientCustomValidationFunction().apply(validationContext);
     }
 
     @OnEnabled
@@ -217,11 +209,8 @@ public class KafkaRecordSink_2_0 extends AbstractControllerService implements Re
         maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
         maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
 
-        final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
-        final Charset charset = Charset.forName(charsetName);
-
-        final Map<String, Object> kafkaProperties = new HashMap<>();
-        buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+        final KafkaPropertyProvider propertyProvider = new StandardKafkaPropertyProvider(ProducerConfig.class);
+        final Map<String, Object> kafkaProperties = propertyProvider.getProperties(context);
         kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
         kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
@@ -299,93 +288,12 @@ public class KafkaRecordSink_2_0 extends AbstractControllerService implements Re
     }
 
     @OnDisabled
-    public void stop() throws IOException {
+    public void stop() {
         if (producer != null) {
             producer.close(maxAckWaitMillis, TimeUnit.MILLISECONDS);
         }
     }
 
-    static void buildCommonKafkaProperties(final ConfigurationContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
-        for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
-            if (propertyDescriptor.equals(KafkaProcessorUtils.SSL_CONTEXT_SERVICE)) {
-                // Translate SSLContext Service configuration into Kafka properties
-                final SSLContextService sslContextService = context.getProperty(KafkaProcessorUtils.SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
-                if (sslContextService != null && sslContextService.isKeyStoreConfigured()) {
-                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile());
-                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword());
-                    final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
-                    mapToPopulate.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPass);
-                    mapToPopulate.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, sslContextService.getKeyStoreType());
-                }
-
-                if (sslContextService != null && sslContextService.isTrustStoreConfigured()) {
-                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslContextService.getTrustStoreFile());
-                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslContextService.getTrustStorePassword());
-                    mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
-                }
-            }
-
-            String propertyName = propertyDescriptor.getName();
-            String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
-                    ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
-                    : context.getProperty(propertyDescriptor).getValue();
-
-            if (propertyValue != null) {
-                // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
-                // or the standard NiFi time period such as "5 secs"
-                if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
-                    propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
-                }
-
-                if (KafkaProcessorUtils.isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
-                    mapToPopulate.put(propertyName, propertyValue);
-                }
-            }
-        }
-
-        String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        if (KafkaProcessorUtils.SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || KafkaProcessorUtils.SEC_SASL_SSL.getValue().equals(securityProtocol)) {
-            setJaasConfig(mapToPopulate, context);
-        }
-    }
-
-    /**
-     * Method used to configure the 'sasl.jaas.config' property based on KAFKA-4259<br />
-     * https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients<br />
-     * <br />
-     * It expects something with the following format: <br />
-     * <br />
-     * &lt;LoginModuleClass&gt; &lt;ControlFlag&gt; *(&lt;OptionName&gt;=&lt;OptionValue&gt;); <br />
-     * ControlFlag = required / requisite / sufficient / optional
-     *
-     * @param mapToPopulate Map of configuration properties
-     * @param context       Context
-     */
-    private static void setJaasConfig(Map<String, Object> mapToPopulate, ConfigurationContext context) {
-        String keytab = null;
-        String principal = null;
-
-        // If the Kerberos Credentials Service is specified, we need to use its configuration, not the explicit properties for principal/keytab.
-        // The customValidate method ensures that only one can be set, so we know that the principal & keytab above are null.
-        final KerberosCredentialsService credentialsService = context.getProperty(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE).asControllerService(KerberosCredentialsService.class);
-        if (credentialsService != null) {
-            principal = credentialsService.getPrincipal();
-            keytab = credentialsService.getKeytab();
-        }
-
-
-        String serviceName = context.getProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME).evaluateAttributeExpressions().getValue();
-        if (StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
-            mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
-                    + "useTicketCache=false "
-                    + "renewTicket=true "
-                    + "serviceName=\"" + serviceName + "\" "
-                    + "useKeyTab=true "
-                    + "keyTab=\"" + keytab + "\" "
-                    + "principal=\"" + principal + "\";");
-        }
-    }
-
     // this getter is intended explicitly for testing purposes
     protected RecordSetWriterFactory getWriterFactory() {
         return this.writerFactory;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka_2_0.java
index c4e8813d8f..ae1c03dd6e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ITConsumeKafka_2_0.java
@@ -42,7 +42,7 @@ public class ITConsumeKafka_2_0 {
     }
 
     @Test
-    public void validateGetAllMessages() throws Exception {
+    public void validateGetAllMessages() {
         String groupName = "validateGetAllMessages";
 
         when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
@@ -56,7 +56,7 @@ public class ITConsumeKafka_2_0 {
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_2_0.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka_2_0.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
@@ -72,7 +72,7 @@ public class ITConsumeKafka_2_0 {
     }
 
     @Test
-    public void validateGetAllMessagesPattern() throws Exception {
+    public void validateGetAllMessagesPattern() {
         String groupName = "validateGetAllMessagesPattern";
 
         when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
@@ -86,7 +86,7 @@ public class ITConsumeKafka_2_0 {
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_2_0.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka_2_0.TOPICS, "(fo.*)|(ba)");
         runner.setProperty(ConsumeKafka_2_0.TOPIC_TYPE, "pattern");
         runner.setProperty(ConsumeKafka_2_0.GROUP_ID, groupName);
@@ -103,7 +103,7 @@ public class ITConsumeKafka_2_0 {
     }
 
     @Test
-    public void validateGetErrorMessages() throws Exception {
+    public void validateGetErrorMessages() {
         String groupName = "validateGetErrorMessages";
 
         when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
@@ -117,7 +117,7 @@ public class ITConsumeKafka_2_0 {
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_2_0.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka_2_0.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
index e57d0705b8..2275cbdba7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_2_0.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.kafka.pubsub;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.kafka.shared.property.SecurityProtocol;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
@@ -58,7 +60,7 @@ public class TestConsumeKafkaRecord_2_0 {
         };
 
         runner = TestRunners.newTestRunner(proc);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafkaRecord_2_0.BOOTSTRAP_SERVERS, "okeydokey:1234");
 
         final String readerId = "record-reader";
         final MockRecordParser readerService = new MockRecordParser();
@@ -77,24 +79,18 @@ public class TestConsumeKafkaRecord_2_0 {
     }
 
     @Test
-    public void validateCustomValidatorSettings() throws Exception {
+    public void validateCustomValidatorSettings() {
         runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
         runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         runner.assertValid();
-        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
-        runner.assertNotValid();
-        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
-        runner.assertValid();
         runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         runner.assertValid();
-        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        runner.assertNotValid();
     }
 
     @Test
-    public void validatePropertiesValidation() throws Exception {
+    public void validatePropertiesValidation() {
         runner.setProperty(ConsumeKafkaRecord_2_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
@@ -116,7 +112,7 @@ public class TestConsumeKafkaRecord_2_0 {
     }
 
     @Test
-    public void validateGetAllMessages() throws Exception {
+    public void validateGetAllMessages() {
         String groupName = "validateGetAllMessages";
 
         when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
@@ -138,7 +134,7 @@ public class TestConsumeKafkaRecord_2_0 {
     }
 
     @Test
-    public void validateGetAllMessagesPattern() throws Exception {
+    public void validateGetAllMessagesPattern() {
         String groupName = "validateGetAllMessagesPattern";
 
         when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
@@ -161,7 +157,7 @@ public class TestConsumeKafkaRecord_2_0 {
     }
 
     @Test
-    public void validateGetErrorMessages() throws Exception {
+    public void validateGetErrorMessages() {
         String groupName = "validateGetErrorMessages";
 
         when(mockConsumerPool.obtainConsumer(any(), any())).thenReturn(mockLease);
@@ -188,19 +184,19 @@ public class TestConsumeKafkaRecord_2_0 {
         runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
 
-        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.setProperty(ConsumeKafkaRecord_2_0.SECURITY_PROTOCOL, SecurityProtocol.SASL_PLAINTEXT.name());
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
-        runner.assertValid();
+        runner.setProperty(ConsumeKafkaRecord_2_0.KERBEROS_SERVICE_NAME, "kafka");
+        runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+        runner.setProperty(ConsumeKafkaRecord_2_0.KERBEROS_PRINCIPAL, "nifi@APACHE.COM");
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+        runner.setProperty(ConsumeKafkaRecord_2_0.KERBEROS_KEYTAB, "not.A.File");
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+        runner.setProperty(ConsumeKafkaRecord_2_0.KERBEROS_KEYTAB, "src/test/resources/server.properties");
         runner.assertValid();
     }
 
@@ -210,19 +206,19 @@ public class TestConsumeKafkaRecord_2_0 {
         runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
 
-        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.setProperty(ConsumeKafkaRecord_2_0.SECURITY_PROTOCOL, SecurityProtocol.SASL_PLAINTEXT.name());
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.SASL_MECHANISM, KafkaProcessorUtils.PLAIN_VALUE);
+        runner.setProperty(ConsumeKafkaRecord_2_0.SASL_MECHANISM, SaslMechanism.PLAIN.getValue());
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USERNAME, "user1");
+        runner.setProperty(ConsumeKafkaRecord_2_0.SASL_USERNAME, "user1");
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.PASSWORD, "password");
+        runner.setProperty(ConsumeKafkaRecord_2_0.SASL_PASSWORD, "password");
         runner.assertValid();
 
-        runner.removeProperty(KafkaProcessorUtils.USERNAME);
+        runner.removeProperty(ConsumeKafkaRecord_2_0.SASL_USERNAME);
         runner.assertNotValid();
     }
 
@@ -232,19 +228,19 @@ public class TestConsumeKafkaRecord_2_0 {
         runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
 
-        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.setProperty(ConsumeKafkaRecord_2_0.SECURITY_PROTOCOL, SecurityProtocol.SASL_PLAINTEXT.name());
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.SASL_MECHANISM, KafkaProcessorUtils.SCRAM_SHA256_VALUE);
+        runner.setProperty(ConsumeKafkaRecord_2_0.SASL_MECHANISM, SaslMechanism.SCRAM_SHA_256.getValue());
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USERNAME, "user1");
+        runner.setProperty(ConsumeKafkaRecord_2_0.SASL_USERNAME, "user1");
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.PASSWORD, "password");
+        runner.setProperty(ConsumeKafkaRecord_2_0.SASL_PASSWORD, "password");
         runner.assertValid();
 
-        runner.removeProperty(KafkaProcessorUtils.USERNAME);
+        runner.removeProperty(ConsumeKafkaRecord_2_0.SASL_USERNAME);
         runner.assertNotValid();
     }
 
@@ -254,19 +250,19 @@ public class TestConsumeKafkaRecord_2_0 {
         runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
 
-        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.setProperty(ConsumeKafkaRecord_2_0.SECURITY_PROTOCOL, SecurityProtocol.SASL_PLAINTEXT.name());
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.SASL_MECHANISM, KafkaProcessorUtils.SCRAM_SHA512_VALUE);
+        runner.setProperty(ConsumeKafkaRecord_2_0.SASL_MECHANISM, SaslMechanism.SCRAM_SHA_512.getValue());
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USERNAME, "user1");
+        runner.setProperty(ConsumeKafkaRecord_2_0.SASL_USERNAME, "user1");
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.PASSWORD, "password");
+        runner.setProperty(ConsumeKafkaRecord_2_0.SASL_PASSWORD, "password");
         runner.assertValid();
 
-        runner.removeProperty(KafkaProcessorUtils.USERNAME);
+        runner.removeProperty(ConsumeKafkaRecord_2_0.SASL_USERNAME);
         runner.assertNotValid();
     }
 
@@ -276,7 +272,7 @@ public class TestConsumeKafkaRecord_2_0 {
         runner.setProperty(ConsumeKafkaRecord_2_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafkaRecord_2_0.AUTO_OFFSET_RESET, ConsumeKafkaRecord_2_0.OFFSET_EARLIEST);
 
-        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_PLAINTEXT);
+        runner.setProperty(ConsumeKafkaRecord_2_0.SECURITY_PROTOCOL, SecurityProtocol.PLAINTEXT.name());
         runner.assertValid();
     }
 
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_0.java
index 6d5f44f202..560af84170 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafka_2_0.java
@@ -18,6 +18,8 @@ package org.apache.nifi.processors.kafka.pubsub;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.nifi.kafka.shared.property.SaslMechanism;
+import org.apache.nifi.kafka.shared.property.SecurityProtocol;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.jupiter.api.BeforeEach;
@@ -39,80 +41,77 @@ public class TestConsumeKafka_2_0 {
     }
 
     @Test
-    public void validateCustomValidatorSettings() throws Exception {
+    public void validateCustomValidatorSettings() {
         ConsumeKafka_2_0 consumeKafka = new ConsumeKafka_2_0();
         TestRunner runner = TestRunners.newTestRunner(consumeKafka);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_2_0.BOOTSTRAP_SERVERS, "okeydokey:1234");
         runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
         runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         runner.assertValid();
-        runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Foo");
-        runner.assertNotValid();
         runner.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         runner.assertValid();
         runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         runner.assertValid();
-        runner.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-        runner.assertNotValid();
     }
 
     @Test
-    public void validatePropertiesValidation() throws Exception {
+    public void validatePropertiesValidation() {
         ConsumeKafka_2_0 consumeKafka = new ConsumeKafka_2_0();
         TestRunner runner = TestRunners.newTestRunner(consumeKafka);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_2_0.BOOTSTRAP_SERVERS, "okeydokey:1234");
         runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
 
         runner.removeProperty(ConsumeKafka_2_0.GROUP_ID);
 
-        AssertionError e = assertThrows(AssertionError.class, () -> runner.assertValid());
+        AssertionError e = assertThrows(AssertionError.class, runner::assertValid);
         assertTrue(e.getMessage().contains("invalid because Group ID is required"));
 
         runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "");
 
-        e = assertThrows(AssertionError.class, () -> runner.assertValid());
+        e = assertThrows(AssertionError.class, runner::assertValid);
         assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
 
         runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "  ");
 
-        e = assertThrows(AssertionError.class, () -> runner.assertValid());
+        e = assertThrows(AssertionError.class, runner::assertValid);
         assertTrue(e.getMessage().contains("must contain at least one character that is not white space"));
     }
 
     @Test
-    public void testJaasConfiguration() throws Exception {
+    public void testJaasConfiguration() {
         ConsumeKafka_2_0 consumeKafka = new ConsumeKafka_2_0();
         TestRunner runner = TestRunners.newTestRunner(consumeKafka);
-        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "okeydokey:1234");
+        runner.setProperty(ConsumeKafka_2_0.BOOTSTRAP_SERVERS, "okeydokey:1234");
         runner.setProperty(ConsumeKafka_2_0.TOPICS, "foo");
         runner.setProperty(ConsumeKafka_2_0.GROUP_ID, "foo");
         runner.setProperty(ConsumeKafka_2_0.AUTO_OFFSET_RESET, ConsumeKafka_2_0.OFFSET_EARLIEST);
 
-        runner.setProperty(KafkaProcessorUtils.SECURITY_PROTOCOL, KafkaProcessorUtils.SEC_SASL_PLAINTEXT);
+        runner.setProperty(ConsumeKafka_2_0.SECURITY_PROTOCOL, SecurityProtocol.SASL_PLAINTEXT.name());
+        runner.setProperty(ConsumeKafka_2_0.SASL_MECHANISM, SaslMechanism.GSSAPI.getValue());
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "kafka");
-        runner.assertValid();
+        runner.setProperty(ConsumeKafka_2_0.KERBEROS_SERVICE_NAME, "kafka");
+        runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "nifi@APACHE.COM");
+        runner.setProperty(ConsumeKafka_2_0.KERBEROS_PRINCIPAL, "nifi@APACHE.COM");
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "not.A.File");
+        runner.setProperty(ConsumeKafka_2_0.KERBEROS_KEYTAB, "not.A.File");
         runner.assertNotValid();
 
-        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "src/test/resources/server.properties");
+        runner.setProperty(ConsumeKafka_2_0.KERBEROS_KEYTAB, "src/test/resources/server.properties");
         runner.assertValid();
 
         runner.setVariable("keytab", "src/test/resources/server.properties");
         runner.setVariable("principal", "nifi@APACHE.COM");
         runner.setVariable("service", "kafka");
-        runner.setProperty(KafkaProcessorUtils.USER_PRINCIPAL, "${principal}");
-        runner.setProperty(KafkaProcessorUtils.USER_KEYTAB, "${keytab}");
-        runner.setProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME, "${service}");
+        runner.setProperty(ConsumeKafka_2_0.KERBEROS_PRINCIPAL, "${principal}");
+        runner.setProperty(ConsumeKafka_2_0.KERBEROS_KEYTAB, "${keytab}");
+        runner.setProperty(ConsumeKafka_2_0.KERBEROS_SERVICE_NAME, "${service}");
         runner.assertValid();
     }
 
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java
index 4f53bab922..b70dd41a81 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumerPartitionsUtil.java
@@ -42,7 +42,7 @@ public class TestConsumerPartitionsUtil {
 
     @BeforeEach
     public void setup() throws UnknownHostException {
-        hostname = InetAddress.getLocalHost().getHostName();;
+        hostname = InetAddress.getLocalHost().getHostName();
     }
 
     @Test
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
index 0feed620df..b3110337de 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_2_0.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.kafka.pubsub;
 
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.shared.property.FailureStrategy;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.kafka.pubsub.util.MockRecordParser;
 import org.apache.nifi.reporting.InitializationException;
@@ -155,7 +156,7 @@ public class TestPublishKafkaRecord_2_0 {
 
     @Test
     public void testSingleFailureWithRollback() throws IOException {
-        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+        runner.setProperty(PublishKafkaRecord_2_0.FAILURE_STRATEGY, FailureStrategy.ROLLBACK.getValue());
 
         final MockFlowFile flowFile = runner.enqueue("John Doe, 48");
 
@@ -189,7 +190,7 @@ public class TestPublishKafkaRecord_2_0 {
 
     @Test
     public void testFailureWhenCreatingTransactionWithRollback() {
-        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+        runner.setProperty(PublishKafkaRecord_2_0.FAILURE_STRATEGY, FailureStrategy.ROLLBACK.getValue());
         runner.enqueue("John Doe, 48");
 
         doAnswer((Answer<Object>) invocationOnMock -> {
@@ -224,7 +225,7 @@ public class TestPublishKafkaRecord_2_0 {
 
     @Test
     public void testMultipleFailuresWithRollback() throws IOException {
-        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+        runner.setProperty(PublishKafkaRecord_2_0.FAILURE_STRATEGY, FailureStrategy.ROLLBACK.getValue());
         final Set<FlowFile> flowFiles = new HashSet<>();
         flowFiles.add(runner.enqueue("John Doe, 48"));
         flowFiles.add(runner.enqueue("John Doe, 48"));
@@ -423,7 +424,7 @@ public class TestPublishKafkaRecord_2_0 {
             @Override
             public int getSuccessfulMessageCount(FlowFile flowFile) {
                 Integer count = msgCounts.get(flowFile);
-                return count == null ? 0 : count.intValue();
+                return count == null ? 0 : count;
             }
 
             @Override
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
index 1433a896d8..92fb25da32 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafka_2_0.java
@@ -19,6 +19,7 @@ package org.apache.nifi.processors.kafka.pubsub;
 
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.kafka.shared.property.FailureStrategy;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
@@ -139,7 +140,7 @@ public class TestPublishKafka_2_0 {
 
     @Test
     public void testSingleFailureWithRollback() throws IOException {
-        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+        runner.setProperty(PublishKafka_2_0.FAILURE_STRATEGY, FailureStrategy.ROLLBACK.getValue());
         final MockFlowFile flowFile = runner.enqueue("hello world");
 
         when(mockLease.complete()).thenReturn(createFailurePublishResult(flowFile));
@@ -172,7 +173,7 @@ public class TestPublishKafka_2_0 {
 
     @Test
     public void testMultipleFailuresWithRollback() throws IOException {
-        runner.setProperty(KafkaProcessorUtils.FAILURE_STRATEGY, KafkaProcessorUtils.FAILURE_STRATEGY_ROLLBACK);
+        runner.setProperty(PublishKafka_2_0.FAILURE_STRATEGY, FailureStrategy.ROLLBACK.getValue());
 
         final Set<FlowFile> flowFiles = new HashSet<>();
         flowFiles.add(runner.enqueue("hello world"));
@@ -293,7 +294,7 @@ public class TestPublishKafka_2_0 {
             @Override
             public int getSuccessfulMessageCount(FlowFile flowFile) {
                 Integer count = msgCounts.get(flowFile);
-                return count == null ? 0 : count.intValue();
+                return count == null ? 0 : count;
             }
 
             @Override
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_0.java
index d1bd0ff795..4eb0e08504 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/java/org/apache/nifi/record/sink/kafka/TestKafkaRecordSink_2_0.java
@@ -27,10 +27,10 @@ import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.StateManager;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.kafka.shared.property.SecurityProtocol;
 import org.apache.nifi.kerberos.KerberosCredentialsService;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.DataUnit;
-import org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils;
 import org.apache.nifi.record.sink.RecordSinkService;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
@@ -45,7 +45,7 @@ import org.apache.nifi.serialization.record.RecordSet;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockControllerServiceInitializationContext;
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.mockito.stubbing.Answer;
@@ -138,13 +138,13 @@ public class TestKafkaRecordSink_2_0 {
         when(context.getProperty(KafkaRecordSink_2_0.MESSAGE_HEADER_ENCODING)).thenReturn(charEncodingValue);
 
         final PropertyValue securityValue = Mockito.mock(StandardPropertyValue.class);
-        when(securityValue.getValue()).thenReturn(KafkaProcessorUtils.SEC_SASL_PLAINTEXT.getValue());
-        when(context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL)).thenReturn(securityValue);
+        when(securityValue.getValue()).thenReturn(SecurityProtocol.PLAINTEXT.name());
+        when(context.getProperty(KafkaRecordSink_2_0.SECURITY_PROTOCOL)).thenReturn(securityValue);
 
         final PropertyValue jaasValue = Mockito.mock(StandardPropertyValue.class);
         when(jaasValue.evaluateAttributeExpressions()).thenReturn(jaasValue);
         when(jaasValue.getValue()).thenReturn(null);
-        when(context.getProperty(KafkaProcessorUtils.JAAS_SERVICE_NAME)).thenReturn(jaasValue);
+        when(context.getProperty(KafkaRecordSink_2_0.KERBEROS_SERVICE_NAME)).thenReturn(jaasValue);
 
         Map<PropertyDescriptor, String> propertyMap = new HashMap<>();
         propertyMap.put(KafkaRecordSink_2_0.TOPIC, KafkaRecordSink_2_0.TOPIC.getName());
@@ -160,9 +160,9 @@ public class TestKafkaRecordSink_2_0 {
         MockRecordWriter writer = new MockRecordWriter(null, false);
         when(context.getProperty(RecordSinkService.RECORD_WRITER_FACTORY)).thenReturn(pValue);
         when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer);
-        when(context.getProperty(KafkaProcessorUtils.SSL_CONTEXT_SERVICE)).thenReturn(pValue);
+        when(context.getProperty(KafkaRecordSink_2_0.SSL_CONTEXT_SERVICE)).thenReturn(pValue);
         when(pValue.asControllerService(SSLContextService.class)).thenReturn(null);
-        when(context.getProperty(KafkaProcessorUtils.KERBEROS_CREDENTIALS_SERVICE)).thenReturn(pValue);
+        when(context.getProperty(KafkaRecordSink_2_0.KERBEROS_CREDENTIALS_SERVICE)).thenReturn(pValue);
         when(pValue.asControllerService(KerberosCredentialsService.class)).thenReturn(null);
 
         final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(task, UUID.randomUUID().toString(), logger, stateManager);
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/resources/log4j.properties b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/resources/log4j.properties
deleted file mode 100644
index 57cd63f139..0000000000
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-0-processors/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-log4j.rootCategory=INFO, stdout
-
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ABSOLUTE} %5p %t %c{2}:%L - %m%n
-
-#og4j.category.org.apache.nifi.processors.kafka=DEBUG