You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by sz...@apache.org on 2021/12/01 13:27:05 UTC

[nifi-minifi-cpp] 01/03: MINIFICPP-1677 Add SASL options to Kafka processors

This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 767435de8be46bc5f5a1ae3ea180569a3bcaf8a2
Author: Gabor Gyimesi <ga...@gmail.com>
AuthorDate: Wed Dec 1 14:05:08 2021 +0100

    MINIFICPP-1677 Add SASL options to Kafka processors
    
    - Add new security options: SASL_PLAIN and SASL_SSL
    - Add SASL mechanism property with GSSAPI and PLAIN support
    - Add Username and Password properties for PLAIN mechanism
    - Extract common Kafka authentication code to add the same authentication options to ConsumeKafka processor as PublishKafka
    - Add Kafka broker configuration and tests for SASL/PLAIN and SASL/SSL with username and password authentication
    
    Closes #1212
    Signed-off-by: Marton Szasz <sz...@apache.org>
---
 PROCESSORS.md                                      |  15 +-
 docker/test/integration/features/kafka.feature     | 123 +++++++++++
 .../minifi/core/KafkaBrokerContainer.py            |   8 +-
 .../integration/resources/kafka_broker/Dockerfile  |   2 +-
 .../kafka_broker/conf/server-ssl.properties        | 155 --------------
 .../resources/kafka_broker/conf/server.properties  |  50 ++++-
 extensions/librdkafka/ConsumeKafka.cpp             |  76 ++-----
 extensions/librdkafka/ConsumeKafka.h               |  15 +-
 extensions/librdkafka/KafkaProcessorBase.cpp       | 135 ++++++++++++
 extensions/librdkafka/KafkaProcessorBase.h         |  69 ++++++
 extensions/librdkafka/PublishKafka.cpp             | 237 +++++----------------
 extensions/librdkafka/PublishKafka.h               |  25 ++-
 libminifi/include/utils/ProcessorConfigUtils.h     |  18 +-
 libminifi/src/utils/ProcessorConfigUtils.cpp       |  18 +-
 14 files changed, 501 insertions(+), 445 deletions(-)

diff --git a/PROCESSORS.md b/PROCESSORS.md
index 8c85ec9..2a1bcbb 100644
--- a/PROCESSORS.md
+++ b/PROCESSORS.md
@@ -253,17 +253,23 @@ In the list below, the names of required properties appear in bold. Any other pr
 |Headers To Add As Attributes|||A comma separated list to match against all message headers. Any message header whose name matches an item from the list will be added to the FlowFile as an Attribute. If not specified, no Header values will be added as FlowFile attributes. The behaviour on when multiple headers of the same name are present is set using the DuplicateHeaderHandling attribute.|
 |**Honor Transactions**|true||Specifies whether or not MiNiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an "isolation level" of read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If this value is true, MiNiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some la [...]
 |**Kafka Brokers**|localhost:9092||A comma-separated list of known Kafka Brokers in the format <host>:<port>.<br/>**Supports Expression Language: true**|
+|Kerberos Keytab Path|||The path to the location on the local filesystem where the kerberos keytab is located. Read permission on the file is required.|
+|Kerberos Principal|||Keberos Principal|
+|Kerberos Service Name|||Kerberos Service Name|
 |**Key Attribute Encoding**|UTF-8|Hex<br>UTF-8<br>|FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.|
 |Max Poll Records|10000||Specifies the maximum number of records Kafka should return when polling each time the processor is triggered.|
 |**Max Poll Time**|4 seconds||Specifies the maximum amount of time the consumer can use for polling data from the brokers. Polling is a blocking operation, so the upper limit of this value is specified in 4 seconds.|
 |Message Demarcator|||Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received will result in a single FlowFile which time it is triggered. <br/>**Supports Expression Langua [...]
 |Message Header Encoding|UTF-8|Hex<br>UTF-8<br>|Any message header that is found on a Kafka message will be added to the outbound FlowFile as an attribute. This property indicates the Character Encoding to use for deserializing the headers.|
 |**Offset Reset**|latest|earliest<br>latest<br>none<br>|Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.|
-|SSL Context Service|||SSL Context Service Name|
-|**Security Protocol**|plaintext|plaintext<br>ssl|Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.|
+|Password|||The password for the given username when the SASL Mechanism is sasl_plaintext|
+|SASL Mechanism|GSSAPI|GSSAPI<br/>PLAIN|The SASL mechanism to use for authentication. Corresponds to Kafka's 'sasl.mechanism' property.|
+|**Security Protocol**|plaintext|plaintext<br/>ssl<br/>sasl_plaintext<br/>sasl_ssl|Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.|
 |Session Timeout|60 seconds||Client group session and failure detection timeout. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no hearts are received by the broker for a group member within the session timeout, the broker will remove the consumer from the group and trigger a rebalance. The allowed range is configured with the broker configuration properties group.min.session.timeout.ms and group.max.session.timeout.ms.|
+|SSL Context Service|||SSL Context Service Name|
 |**Topic Name Format**|Names|Names<br>Patterns<br>|Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression. Using regular expressions does not automatically discover Kafka topics created after the processor started.|
 |**Topic Names**|||The name of the Kafka Topic(s) to pull from. Multiple topic names are supported as a comma separated list.<br/>**Supports Expression Language: true**|
+|Username|||The username when the SASL Mechanism is sasl_plaintext|
 ### Properties
 
 | Name | Description |
@@ -1187,16 +1193,19 @@ In the list below, the names of required properties appear in bold. Any other pr
 |Kafka Key|||The key to use for the message. If not specified, the UUID of the flow file is used as the message key.<br/>**Supports Expression Language: true**|
 |Message Key Field|||DEPRECATED, does not work -- use Kafka Key instead|
 |Message Timeout|30 sec||The total time sending a message could take|
+|Password|||The password for the given username when the SASL Mechanism is sasl_plaintext|
 |Queue Buffering Max Time|||Delay to wait for messages in the producer queue to accumulate before constructing message batches|
 |Queue Max Buffer Size|||Maximum total message size sum allowed on the producer queue|
 |Queue Max Message|||Maximum number of messages allowed on the producer queue|
 |Request Timeout|10 sec||The ack timeout of the producer request|
+|SASL Mechanism|GSSAPI|GSSAPI<br/>PLAIN|The SASL mechanism to use for authentication. Corresponds to Kafka's 'sasl.mechanism' property.|
 |SSL Context Service|||SSL Context Service Name|
 |Security CA|||DEPRECATED in favor of SSL Context Service. File or directory path to CA certificate(s) for verifying the broker's key|
 |Security Cert|||DEPRECATED in favor of SSL Context Service. Path to client's public key (PEM) used for authentication|
 |Security Pass Phrase|||DEPRECATED in favor of SSL Context Service. Private key passphrase|
 |Security Private Key|||DEPRECATED in favor of SSL Context Service. Path to client's private key (PEM) used for authentication|
-|Security Protocol|||Protocol used to communicate with brokers|
+|**Security Protocol**|plaintext|plaintext<br/>ssl<br/>sasl_plaintext<br/>sasl_ssl|Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.|
+|Username|||The username when the SASL Mechanism is sasl_plaintext|
 |Target Batch Payload Size|512 KB||The target total payload size for a batch. 0 B means unlimited (Batch Size is still applied).|
 |**Topic Name**|||The Kafka Topic of interest<br/>**Supports Expression Language: true**|
 ### Relationships
diff --git a/docker/test/integration/features/kafka.feature b/docker/test/integration/features/kafka.feature
index ab4c34e..9825b55 100644
--- a/docker/test/integration/features/kafka.feature
+++ b/docker/test/integration/features/kafka.feature
@@ -87,6 +87,89 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
     # We fallback to the flowfile's uuid as message key if the Kafka Key property is not set
     And the Minifi logs match the following regex: "PublishKafka: Message Key \[[a-z0-9]{8}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{12}\]" in less than 10 seconds
 
+  Scenario: A MiNiFi instance transfers data to a kafka broker through SASL Plain security protocol
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker instance
+    And these processor properties are set:
+      | processor name | property name          | property value                         |
+      | PublishKafka   | Topic Name             | test                                   |
+      | PublishKafka   | Request Timeout        | 10 sec                                 |
+      | PublishKafka   | Message Timeout        | 12 sec                                 |
+      | PublishKafka   | Known Brokers          | kafka-broker:9094                      |
+      | PublishKafka   | Client Name            | LMN                                    |
+      | PublishKafka   | Security Protocol      | sasl_plaintext                         |
+      | PublishKafka   | SASL Mechanism         | PLAIN                                  |
+      | PublishKafka   | Username               | alice                                  |
+      | PublishKafka   | Password               | alice-secret                           |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PublishKafka
+    And the "success" relationship of the PublishKafka processor is connected to the PutFile
+
+    And a kafka broker is set up in correspondence with the PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+
+  Scenario: PublishKafka sends can use SASL SSL connect with security properties
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker instance
+    And these processor properties are set:
+      | processor name | property name          | property value                             |
+      | PublishKafka   | Client Name            | LMN                                        |
+      | PublishKafka   | Known Brokers          | kafka-broker:9095                          |
+      | PublishKafka   | Topic Name             | test                                       |
+      | PublishKafka   | Batch Size             | 10                                         |
+      | PublishKafka   | Compress Codec         | none                                       |
+      | PublishKafka   | Delivery Guarantee     | 1                                          |
+      | PublishKafka   | Request Timeout        | 10 sec                                     |
+      | PublishKafka   | Message Timeout        | 12 sec                                     |
+      | PublishKafka   | Security CA            | /tmp/resources/certs/ca-cert               |
+      | PublishKafka   | Security Cert          | /tmp/resources/certs/client_LMN_client.pem |
+      | PublishKafka   | Security Pass Phrase   | abcdefgh                                   |
+      | PublishKafka   | Security Private Key   | /tmp/resources/certs/client_LMN_client.key |
+      | PublishKafka   | Security Protocol      | sasl_ssl                                   |
+      | PublishKafka   | SASL Mechanism         | PLAIN                                      |
+      | PublishKafka   | Username               | alice                                      |
+      | PublishKafka   | Password               | alice-secret                               |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PublishKafka
+    And the "success" relationship of the PublishKafka processor is connected to the PutFile
+
+    And a kafka broker is set up in correspondence with the PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+
+  Scenario: PublishKafka sends can use SASL SSL connect with SSL Context
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker instance
+    And these processor properties are set:
+      | processor name | property name          | property value                             |
+      | PublishKafka   | Client Name            | LMN                                        |
+      | PublishKafka   | Known Brokers          | kafka-broker:9095                          |
+      | PublishKafka   | Topic Name             | test                                       |
+      | PublishKafka   | Batch Size             | 10                                         |
+      | PublishKafka   | Compress Codec         | none                                       |
+      | PublishKafka   | Delivery Guarantee     | 1                                          |
+      | PublishKafka   | Request Timeout        | 10 sec                                     |
+      | PublishKafka   | Message Timeout        | 12 sec                                     |
+      | PublishKafka   | Security Protocol      | sasl_ssl                                   |
+      | PublishKafka   | SASL Mechanism         | PLAIN                                      |
+      | PublishKafka   | Username               | alice                                      |
+      | PublishKafka   | Password               | alice-secret                               |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And an ssl context service set up for PublishKafka
+    And the "success" relationship of the GetFile processor is connected to the PublishKafka
+    And the "success" relationship of the PublishKafka processor is connected to the PutFile
+
+    And a kafka broker is set up in correspondence with the PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+
   Scenario: PublishKafka sends can use SSL connect with SSL Context Service
     Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
     And a file with the content "test" is present in "/tmp/input"
@@ -327,3 +410,43 @@ Feature: Sending data to using Kafka streaming platform using PublishKafka
     And a message with content "Lewis Carroll" is published to the "ConsumeKafkaTest" topic using an ssl connection
 
     Then two flowfiles with the contents "Alice's Adventures in Wonderland" and "Lewis Carroll" are placed in the monitored directory in less than 60 seconds
+
+  Scenario: ConsumeKafka receives data via SASL SSL
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And these processor properties are set:
+      | processor name | property name        | property value                             |
+      | ConsumeKafka   | Kafka Brokers        | kafka-broker:9095                          |
+      | ConsumeKafka   | Security Protocol    | sasl_ssl                                   |
+      | ConsumeKafka   | SASL Mechanism       | PLAIN                                      |
+      | ConsumeKafka   | Username             | alice                                      |
+      | ConsumeKafka   | Password             | alice-secret                               |
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+    And an ssl context service set up for ConsumeKafka
+    And the "success" relationship of the ConsumeKafka processor is connected to the PutFile
+
+    And a kafka broker is set up in correspondence with the publisher flow
+
+    When all instances start up
+    And a message with content "Alice's Adventures in Wonderland" is published to the "ConsumeKafkaTest" topic using an ssl connection
+    And a message with content "Lewis Carroll" is published to the "ConsumeKafkaTest" topic using an ssl connection
+
+    Then two flowfiles with the contents "Alice's Adventures in Wonderland" and "Lewis Carroll" are placed in the monitored directory in less than 60 seconds
+
+  Scenario: MiNiFi consumes data from a kafka topic via SASL PLAIN connection
+    Given a ConsumeKafka processor set up in a "kafka-consumer-flow" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "kafka-consumer-flow" flow
+    And the "success" relationship of the ConsumeKafka processor is connected to the PutFile
+    And these processor properties are set:
+      | processor name | property name        | property value                             |
+      | ConsumeKafka   | Kafka Brokers        | kafka-broker:9094                          |
+      | ConsumeKafka   | Security Protocol    | sasl_plaintext                             |
+      | ConsumeKafka   | SASL Mechanism       | PLAIN                                      |
+      | ConsumeKafka   | Username             | alice                                      |
+      | ConsumeKafka   | Password             | alice-secret                               |
+
+    And a kafka broker is set up in correspondence with the third-party kafka publisher
+
+    When all instances start up
+    And a message with content "some test message" is published to the "ConsumeKafkaTest" topic
+
+    Then at least one flowfile with the content "some test message" is placed in the monitored directory in less than 60 seconds
diff --git a/docker/test/integration/minifi/core/KafkaBrokerContainer.py b/docker/test/integration/minifi/core/KafkaBrokerContainer.py
index df215ef..64346c3 100644
--- a/docker/test/integration/minifi/core/KafkaBrokerContainer.py
+++ b/docker/test/integration/minifi/core/KafkaBrokerContainer.py
@@ -19,15 +19,15 @@ class KafkaBrokerContainer(Container):
             detach=True,
             name='kafka-broker',
             network=self.network.name,
-            ports={'9092/tcp': 9092, '29092/tcp': 29092, '9093/tcp': 9093, '29093/tcp': 29093},
+            ports={'9092/tcp': 9092, '29092/tcp': 29092, '9093/tcp': 9093, '29093/tcp': 29093, '9094/tcp': 9094, '29094/tcp': 29094, '9094/tcp': 9094, '29095/tcp': 29095},
             environment=[
                 "KAFKA_BROKER_ID=1",
                 "ALLOW_PLAINTEXT_LISTENER=yes",
                 "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true",
-                "KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,SSL_HOST://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:29092",
-                "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL",
+                "KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093,SASL_PLAINTEXT://kafka-broker:9094,SASL_SSL://kafka-broker:9095,SSL_HOST://0.0.0.0:29093,PLAINTEXT_HOST://0.0.0.0:29092,SASL_PLAINTEXT_HOST://0.0.0.0:29094,SASL_SSL_HOST://0.0.0.0:29095",
+                "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,SSL:SSL,SSL_HOST:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_PLAINTEXT_HOST:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,SASL_SSL_HOST:SASL_SSL",
                 "KAFKA_SECURITY_INTER_BROKER_PROTOCOL=SSL",
-                "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,PLAINTEXT_HOST://localhost:29092,SSL://kafka-broker:9093,SSL_HOST://localhost:29093",
+                "KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka-broker:9092,PLAINTEXT_HOST://localhost:29092,SSL://kafka-broker:9093,SSL_HOST://localhost:29093,SASL_PLAINTEXT://kafka-broker:9094,SASL_PLAINTEXT_HOST://localhost:29094,SASL_SSL://kafka-broker:9095,SASL_SSL_HOST://localhost:29095",
                 "KAFKA_HEAP_OPTS=-Xms512m -Xmx1g",
                 "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181",
                 "SSL_CLIENT_AUTH=none"])
diff --git a/docker/test/integration/resources/kafka_broker/Dockerfile b/docker/test/integration/resources/kafka_broker/Dockerfile
index 31472c9..9c332a1 100644
--- a/docker/test/integration/resources/kafka_broker/Dockerfile
+++ b/docker/test/integration/resources/kafka_broker/Dockerfile
@@ -1,3 +1,3 @@
 FROM wurstmeister/kafka:2.12-2.5.0
-ADD conf/server-ssl.properties $KAFKA_HOME/config/server.properties
+ADD conf/server.properties $KAFKA_HOME/config/server.properties
 ADD conf/ /usr/local/etc/kafka/
diff --git a/docker/test/integration/resources/kafka_broker/conf/server-ssl.properties b/docker/test/integration/resources/kafka_broker/conf/server-ssl.properties
deleted file mode 100644
index c7158fa..0000000
--- a/docker/test/integration/resources/kafka_broker/conf/server-ssl.properties
+++ /dev/null
@@ -1,155 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# see kafka.server.KafkaConfig for additional details and defaults
-
-############################# Server Basics #############################
-
-# The id of the broker. This must be set to a unique integer for each broker.
-broker.id=0
-
-############################# Socket Server Settings #############################
-
-# The address the socket server listens on. It will get the value returned from
-# java.net.InetAddress.getCanonicalHostName() if not configured.
-#   FORMAT:
-#     listeners = listener_name://host_name:port
-#   EXAMPLE:
-#     listeners = PLAINTEXT://your.host.name:9092
-#listeners=PLAINTEXT://:9092
-
-# Hostname and port the broker will advertise to producers and consumers. If not set,
-# it uses the value for "listeners" if configured.  Otherwise, it will use the value
-# returned from java.net.InetAddress.getCanonicalHostName().
-#advertised.listeners=PLAINTEXT://your.host.name:9092
-
-# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
-#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
-
-# The number of threads that the server uses for receiving requests from the network and sending responses to the network
-num.network.threads=3
-
-# The number of threads that the server uses for processing requests, which may include disk I/O
-num.io.threads=8
-
-# The send buffer (SO_SNDBUF) used by the socket server
-socket.send.buffer.bytes=102400
-
-# The receive buffer (SO_RCVBUF) used by the socket server
-socket.receive.buffer.bytes=102400
-
-# The maximum size of a request that the socket server will accept (protection against OOM)
-socket.request.max.bytes=104857600
-
-
-############################# Log Basics #############################
-
-# A comma separated list of directories under which to store log files
-log.dirs=/tmp/kafka-logs
-
-# The default number of log partitions per topic. More partitions allow greater
-# parallelism for consumption, but this will also result in more files across
-# the brokers.
-num.partitions=1
-
-# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
-# This value is recommended to be increased for installations with data dirs located in RAID array.
-num.recovery.threads.per.data.dir=1
-
-############################# Internal Topic Settings  #############################
-# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
-# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
-offsets.topic.replication.factor=1
-transaction.state.log.replication.factor=1
-transaction.state.log.min.isr=1
-
-############################# Log Flush Policy #############################
-
-# Messages are immediately written to the filesystem but by default we only fsync() to sync
-# the OS cache lazily. The following configurations control the flush of data to disk.
-# There are a few important trade-offs here:
-#    1. Durability: Unflushed data may be lost if you are not using replication.
-#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
-#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
-# The settings below allow one to configure the flush policy to flush data after a period of time or
-# every N messages (or both). This can be done globally and overridden on a per-topic basis.
-
-# The number of messages to accept before forcing a flush of data to disk
-#log.flush.interval.messages=10000
-
-# The maximum amount of time a message can sit in a log before we force a flush
-#log.flush.interval.ms=1000
-
-############################# Log Retention Policy #############################
-
-# The following configurations control the disposal of log segments. The policy can
-# be set to delete segments after a period of time, or after a given size has accumulated.
-# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
-# from the end of the log.
-
-# The minimum age of a log file to be eligible for deletion due to age
-log.retention.hours=168
-
-# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
-# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
-#log.retention.bytes=1073741824
-
-# The maximum size of a log segment file. When this size is reached a new log segment will be created.
-log.segment.bytes=1073741824
-
-# The interval at which log segments are checked to see if they can be deleted according
-# to the retention policies
-log.retention.check.interval.ms=300000
-
-############################# Zookeeper #############################
-
-# Zookeeper connection string (see zookeeper docs for details).
-# This is a comma separated host:port pairs, each corresponding to a zk
-# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
-# You can also append an optional chroot string to the urls to specify the
-# root directory for all kafka znodes.
-zookeeper.connect=localhost:2181
-
-# Timeout in ms for connecting to zookeeper
-zookeeper.connection.timeout.ms=6000
-
-
-############################# Group Coordinator Settings #############################
-
-# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
-# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
-# The default value for this is 3 seconds.
-# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
-# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
-group.initial.rebalance.delay.ms=0
-
-listeners=SSL://kafka-broker:9093,SSL_HOST://0.0.0.0:29093
-advertised.listeners=SSL://kafka-broker:9093,SSL_HOST://localhost:29093
-listener.security.protocol.map=SSL:SSL,SSL_HOST:SSL
-
-# SSL
-ssl.protocol = TLS
-ssl.enabled.protocols=TLSv1.2
-ssl.keystore.type = JKS
-ssl.keystore.location = /usr/local/etc/kafka/certs/broker_kafka-broker_server.keystore.jks
-# ssl.keystore.location = /usr/local/etc/kafka/certs/broker_localhost_server.keystore.jks
-ssl.keystore.password = abcdefgh
-ssl.key.password = abcdefgh
-ssl.truststore.type = JKS
-ssl.truststore.location = /usr/local/etc/kafka/certs/broker_kafka-broker_server.truststore.jks
-# ssl.truststore.location = /usr/local/etc/kafka/certs/broker_localhost_server.truststore.jks
-ssl.truststore.password = abcdefgh
-# To require authentication of clients use "require", else "none" or "request"
-ssl.client.auth = required
diff --git a/docker/test/integration/resources/kafka_broker/conf/server.properties b/docker/test/integration/resources/kafka_broker/conf/server.properties
index 20d9095..e752fca 100644
--- a/docker/test/integration/resources/kafka_broker/conf/server.properties
+++ b/docker/test/integration/resources/kafka_broker/conf/server.properties
@@ -22,7 +22,7 @@ broker.id=0
 
 ############################# Socket Server Settings #############################
 
-# The address the socket server listens on. It will get the value returned from 
+# The address the socket server listens on. It will get the value returned from
 # java.net.InetAddress.getCanonicalHostName() if not configured.
 #   FORMAT:
 #     listeners = listener_name://host_name:port
@@ -30,7 +30,7 @@ broker.id=0
 #     listeners = PLAINTEXT://your.host.name:9092
 #listeners=PLAINTEXT://:9092
 
-# Hostname and port the broker will advertise to producers and consumers. If not set, 
+# Hostname and port the broker will advertise to producers and consumers. If not set,
 # it uses the value for "listeners" if configured.  Otherwise, it will use the value
 # returned from java.net.InetAddress.getCanonicalHostName().
 #advertised.listeners=PLAINTEXT://your.host.name:9092
@@ -57,7 +57,7 @@ socket.request.max.bytes=104857600
 ############################# Log Basics #############################
 
 # A comma separated list of directories under which to store log files
-log.dirs=/usr/local/var/lib/kafka-logs
+log.dirs=/tmp/kafka-logs
 
 # The default number of log partitions per topic. More partitions allow greater
 # parallelism for consumption, but this will also result in more files across
@@ -133,4 +133,46 @@ zookeeper.connection.timeout.ms=6000
 # The default value for this is 3 seconds.
 # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
 # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
-group.initial.rebalance.delay.ms=0
\ No newline at end of file
+group.initial.rebalance.delay.ms=0
+
+sasl.enabled.mechanisms=PLAIN
+sasl.mechanism.inter.broker.protocol=PLAIN
+confluent.metrics.reporter.sasl.mechanism=PLAIN
+listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+  username="admin" \
+  password="admin-secret" \
+  user_admin="admin-secret" \
+  user_alice="alice-secret";
+
+listener.name.sasl_plaintext_host.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+  username="admin" \
+  password="admin-secret" \
+  user_admin="admin-secret" \
+  user_alice="alice-secret";
+
+listener.name.sasl_ssl.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+  username="admin" \
+  password="admin-secret" \
+  user_admin="admin-secret" \
+  user_alice="alice-secret";
+
+listener.name.sasl_ssl_host.plain.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
+  username="admin" \
+  password="admin-secret" \
+  user_admin="admin-secret" \
+  user_alice="alice-secret";
+
+# SSL
+ssl.protocol = TLS
+ssl.enabled.protocols=TLSv1.2
+ssl.keystore.type = JKS
+ssl.keystore.location = /usr/local/etc/kafka/certs/broker_kafka-broker_server.keystore.jks
+# ssl.keystore.location = /usr/local/etc/kafka/certs/broker_localhost_server.keystore.jks
+ssl.keystore.password = abcdefgh
+ssl.key.password = abcdefgh
+ssl.truststore.type = JKS
+ssl.truststore.location = /usr/local/etc/kafka/certs/broker_kafka-broker_server.truststore.jks
+# ssl.truststore.location = /usr/local/etc/kafka/certs/broker_localhost_server.truststore.jks
+ssl.truststore.password = abcdefgh
+# To require authentication of clients use "require", else "none" or "request"
+ssl.client.auth = required
diff --git a/extensions/librdkafka/ConsumeKafka.cpp b/extensions/librdkafka/ConsumeKafka.cpp
index e27051e..eef2fb4 100644
--- a/extensions/librdkafka/ConsumeKafka.cpp
+++ b/extensions/librdkafka/ConsumeKafka.cpp
@@ -65,13 +65,6 @@ core::Property ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty(
   ->isRequired(true)
   ->build());
 
-core::Property ConsumeKafka::SecurityProtocol(core::PropertyBuilder::createProperty("Security Protocol")
-  ->withDescription("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.")
-  ->withAllowableValues<std::string>({SECURITY_PROTOCOL_PLAINTEXT, SECURITY_PROTOCOL_SSL})
-  ->withDefaultValue(SECURITY_PROTOCOL_PLAINTEXT)
-  ->isRequired(true)
-  ->build());
-
 core::Property ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
   ->withDescription("The name of the Kafka Topic(s) to pull from. Multiple topic names are supported as a comma separated list.")
   ->supportsExpressionLanguage(true)
@@ -168,18 +161,19 @@ core::Property ConsumeKafka::SessionTimeout(core::PropertyBuilder::createPropert
   ->withDefaultValue<core::TimePeriodValue>("60 seconds")
   ->build());
 
-core::Property ConsumeKafka::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")
-        ->withDescription("SSL Context Service Name")
-        ->asType<minifi::controllers::SSLContextService>()
-        ->build());
-
 const core::Relationship ConsumeKafka::Success("success", "Incoming Kafka messages as flowfiles. Depending on the demarcation strategy, this can be one or multiple flowfiles per message.");
 
 void ConsumeKafka::initialize() {
   setSupportedProperties({
-    KafkaBrokers,
     SecurityProtocol,
+    SSLContextService,
+    KerberosServiceName,
+    KerberosPrincipal,
+    KerberosKeytabPath,
+    SASLMechanism,
+    Username,
+    Password,
+    KafkaBrokers,
     TopicNames,
     TopicNameFormat,
     HonorTransactions,
@@ -192,8 +186,7 @@ void ConsumeKafka::initialize() {
     DuplicateHeaderHandling,
     MaxPollRecords,
     MaxPollTime,
-    SessionTimeout,
-    SSLContextService
+    SessionTimeout
   });
   setSupportedRelationships({
     Success,
@@ -203,40 +196,22 @@ void ConsumeKafka::initialize() {
 void ConsumeKafka::onSchedule(core::ProcessContext* context, core::ProcessSessionFactory* /* sessionFactory */) {
   gsl_Expects(context);
   // Required properties
-  kafka_brokers_                = utils::getRequiredPropertyOrThrow(context, KafkaBrokers.getName());
-  security_protocol_            = utils::getRequiredPropertyOrThrow(context, SecurityProtocol.getName());
-  topic_names_                  = utils::listFromRequiredCommaSeparatedProperty(context, TopicNames.getName());
-  topic_name_format_            = utils::getRequiredPropertyOrThrow(context, TopicNameFormat.getName());
-  honor_transactions_           = utils::parseBooleanPropertyOrThrow(context, HonorTransactions.getName());
-  group_id_                     = utils::getRequiredPropertyOrThrow(context, GroupID.getName());
-  offset_reset_                 = utils::getRequiredPropertyOrThrow(context, OffsetReset.getName());
-  key_attribute_encoding_       = utils::getRequiredPropertyOrThrow(context, KeyAttributeEncoding.getName());
-  max_poll_time_milliseconds_   = utils::parseTimePropertyMSOrThrow(context, MaxPollTime.getName());
-  session_timeout_milliseconds_ = utils::parseTimePropertyMSOrThrow(context, SessionTimeout.getName());
+  kafka_brokers_                = utils::getRequiredPropertyOrThrow(*context, KafkaBrokers.getName());
+  topic_names_                  = utils::listFromRequiredCommaSeparatedProperty(*context, TopicNames.getName());
+  topic_name_format_            = utils::getRequiredPropertyOrThrow(*context, TopicNameFormat.getName());
+  honor_transactions_           = utils::parseBooleanPropertyOrThrow(*context, HonorTransactions.getName());
+  group_id_                     = utils::getRequiredPropertyOrThrow(*context, GroupID.getName());
+  offset_reset_                 = utils::getRequiredPropertyOrThrow(*context, OffsetReset.getName());
+  key_attribute_encoding_       = utils::getRequiredPropertyOrThrow(*context, KeyAttributeEncoding.getName());
+  max_poll_time_milliseconds_   = utils::parseTimePropertyMSOrThrow(*context, MaxPollTime.getName());
+  session_timeout_milliseconds_ = utils::parseTimePropertyMSOrThrow(*context, SessionTimeout.getName());
 
   // Optional properties
   context->getProperty(MessageDemarcator.getName(), message_demarcator_);
   context->getProperty(MessageHeaderEncoding.getName(), message_header_encoding_);
   context->getProperty(DuplicateHeaderHandling.getName(), duplicate_header_handling_);
 
-  std::string ssl_service_name;
-  std::shared_ptr<minifi::controllers::SSLContextService> ssl_service;
-  if (context->getProperty(SSLContextService.getName(), ssl_service_name) && !ssl_service_name.empty()) {
-    std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(ssl_service_name);
-    if (service) {
-      ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-      ssl_data_.ca_loc = ssl_service->getCACertificate();
-      ssl_data_.cert_loc = ssl_service->getCertificateFile();
-      ssl_data_.key_loc = ssl_service->getPrivateKeyFile();
-      ssl_data_.key_pw = ssl_service->getPassphrase();
-    } else {
-      logger_->log_warn("SSL Context Service property is set to '%s', but the controller service could not be found.", ssl_service_name);
-    }
-  } else if (security_protocol_ == SECURITY_PROTOCOL_SSL) {
-    logger_->log_warn("Security protocol is set to %s, but no valid SSL Context Service property is set.", SECURITY_PROTOCOL_SSL);
-  }
-
-  headers_to_add_as_attributes_ = utils::listFromCommaSeparatedProperty(context, HeadersToAddAsAttributes.getName());
+  headers_to_add_as_attributes_ = utils::listFromCommaSeparatedProperty(*context, HeadersToAddAsAttributes.getName());
   max_poll_records_ = gsl::narrow<std::size_t>(utils::getOptionalUintProperty(*context, MaxPollRecords.getName()).value_or(DEFAULT_MAX_POLL_RECORDS));
 
   if (!utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_UTF_8, key_attribute_encoding_) && !utils::StringUtils::equalsIgnoreCase(KEY_ATTR_ENCODING_HEX, key_attribute_encoding_)) {
@@ -324,7 +299,7 @@ void ConsumeKafka::extend_config_from_dynamic_properties(const core::ProcessCont
   }
 }
 
-void ConsumeKafka::configure_new_connection(const core::ProcessContext& context) {
+void ConsumeKafka::configure_new_connection(core::ProcessContext& context) {
   using utils::setKafkaConfigurationField;
 
   conf_ = { rd_kafka_conf_new(), utils::rd_kafka_conf_deleter() };
@@ -342,18 +317,11 @@ void ConsumeKafka::configure_new_connection(const core::ProcessContext& context)
   // logger_->log_info("Enabling all debug logs for kafka consumer.");
   // setKafkaConfigurationField(*conf_, "debug", "all");
 
+  setKafkaAuthenticationParameters(context, gsl::make_not_null(conf_.get()));
+
   setKafkaConfigurationField(*conf_, "bootstrap.servers", kafka_brokers_);
   setKafkaConfigurationField(*conf_, "allow.auto.create.topics", "true");
   setKafkaConfigurationField(*conf_, "auto.offset.reset", offset_reset_);
-
-  if (security_protocol_ == SECURITY_PROTOCOL_SSL) {
-    setKafkaConfigurationField(*conf_, "security.protocol", "ssl");
-    setKafkaConfigurationField(*conf_, "ssl.ca.location", ssl_data_.ca_loc);
-    setKafkaConfigurationField(*conf_, "ssl.certificate.location", ssl_data_.cert_loc);
-    setKafkaConfigurationField(*conf_, "ssl.key.location", ssl_data_.key_loc);
-    setKafkaConfigurationField(*conf_, "ssl.key.password", ssl_data_.key_pw);
-  }
-
   setKafkaConfigurationField(*conf_, "enable.auto.commit", "false");
   setKafkaConfigurationField(*conf_, "enable.auto.offset.store", "false");
   setKafkaConfigurationField(*conf_, "isolation.level", honor_transactions_ ? "read_committed" : "read_uncommitted");
diff --git a/extensions/librdkafka/ConsumeKafka.h b/extensions/librdkafka/ConsumeKafka.h
index 14a23d0..e8610e0 100644
--- a/extensions/librdkafka/ConsumeKafka.h
+++ b/extensions/librdkafka/ConsumeKafka.h
@@ -23,7 +23,7 @@
 #include <utility>
 #include <vector>
 
-#include "core/Processor.h"
+#include "KafkaProcessorBase.h"
 #include "core/logging/LoggerConfiguration.h"
 #include "rdkafka.h"
 #include "rdkafka_utils.h"
@@ -35,13 +35,12 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-class ConsumeKafka : public core::Processor {
+class ConsumeKafka : public KafkaProcessorBase {
  public:
   EXTENSIONAPI static constexpr char const* ProcessorName = "ConsumeKafka";
 
   // Supported Properties
   EXTENSIONAPI static core::Property KafkaBrokers;
-  EXTENSIONAPI static core::Property SecurityProtocol;
   EXTENSIONAPI static core::Property TopicNames;
   EXTENSIONAPI static core::Property TopicNameFormat;
   EXTENSIONAPI static core::Property HonorTransactions;
@@ -55,7 +54,6 @@ class ConsumeKafka : public core::Processor {
   EXTENSIONAPI static core::Property MaxPollRecords;
   EXTENSIONAPI static core::Property MaxPollTime;
   EXTENSIONAPI static core::Property SessionTimeout;
-  EXTENSIONAPI static core::Property SSLContextService;
 
   // Supported Relationships
   EXTENSIONAPI static const core::Relationship Success;
@@ -98,7 +96,7 @@ class ConsumeKafka : public core::Processor {
   static constexpr const std::size_t METADATA_COMMUNICATIONS_TIMEOUT_MS{ 60000 };
 
   explicit ConsumeKafka(const std::string& name, const utils::Identifier& uuid = utils::Identifier()) :
-      Processor(name, uuid) {}
+      KafkaProcessorBase(name, uuid, core::logging::LoggerFactory<ConsumeKafka>::getLogger()) {}
 
   ~ConsumeKafka() override = default;
 
@@ -126,7 +124,7 @@ class ConsumeKafka : public core::Processor {
  private:
   void create_topic_partition_list();
   void extend_config_from_dynamic_properties(const core::ProcessContext& context);
-  void configure_new_connection(const core::ProcessContext& context);
+  void configure_new_connection(core::ProcessContext& context);
   std::string extract_message(const rd_kafka_message_t& rkmessage) const;
   std::vector<std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter>> poll_kafka_messages();
   utils::KafkaEncoding key_attr_encoding_attr_to_enum() const;
@@ -155,7 +153,6 @@ class ConsumeKafka : public core::Processor {
   }
 
   std::string kafka_brokers_;
-  std::string security_protocol_;
   std::vector<std::string> topic_names_;
   std::string topic_name_format_;
   bool honor_transactions_;
@@ -170,8 +167,6 @@ class ConsumeKafka : public core::Processor {
   std::chrono::milliseconds max_poll_time_milliseconds_;
   std::chrono::milliseconds session_timeout_milliseconds_;
 
-  utils::SSL_data ssl_data_;
-
   std::unique_ptr<rd_kafka_t, utils::rd_kafka_consumer_deleter> consumer_;
   std::unique_ptr<rd_kafka_conf_t, utils::rd_kafka_conf_deleter> conf_;
   std::unique_ptr<rd_kafka_topic_partition_list_t, utils::rd_kafka_topic_partition_list_deleter> kf_topic_partition_list_;
@@ -181,8 +176,6 @@ class ConsumeKafka : public core::Processor {
   std::vector<std::unique_ptr<rd_kafka_message_t, utils::rd_kafka_message_deleter>> pending_messages_;
 
   std::mutex do_not_call_on_trigger_concurrently_;
-
-  std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ConsumeKafka>::getLogger()};
 };
 
 }  // namespace processors
diff --git a/extensions/librdkafka/KafkaProcessorBase.cpp b/extensions/librdkafka/KafkaProcessorBase.cpp
new file mode 100644
index 0000000..d54404c
--- /dev/null
+++ b/extensions/librdkafka/KafkaProcessorBase.cpp
@@ -0,0 +1,135 @@
+/**
+ * @file KafkaProcessorBase.cpp
+ * KafkaProcessorBase class implementation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "KafkaProcessorBase.h"
+
+#include "rdkafka_utils.h"
+#include "utils/ProcessorConfigUtils.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+const core::Property KafkaProcessorBase::SecurityProtocol(
+        core::PropertyBuilder::createProperty("Security Protocol")
+        ->withDescription("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.")
+        ->withDefaultValue<std::string>(toString(SecurityProtocolOption::PLAINTEXT))
+        ->withAllowableValues<std::string>(SecurityProtocolOption::values())
+        ->isRequired(true)
+        ->build());
+const core::Property KafkaProcessorBase::SSLContextService(
+    core::PropertyBuilder::createProperty("SSL Context Service")
+        ->withDescription("SSL Context Service Name")
+        ->asType<minifi::controllers::SSLContextService>()
+        ->build());
+const core::Property KafkaProcessorBase::KerberosServiceName(
+    core::PropertyBuilder::createProperty("Kerberos Service Name")
+        ->withDescription("Kerberos Service Name")
+        ->build());
+const core::Property KafkaProcessorBase::KerberosPrincipal(
+    core::PropertyBuilder::createProperty("Kerberos Principal")
+        ->withDescription("Keberos Principal")
+        ->build());
+const core::Property KafkaProcessorBase::KerberosKeytabPath(
+    core::PropertyBuilder::createProperty("Kerberos Keytab Path")
+        ->withDescription("The path to the location on the local filesystem where the kerberos keytab is located. Read permission on the file is required.")
+        ->build());
+const core::Property KafkaProcessorBase::SASLMechanism(
+        core::PropertyBuilder::createProperty("SASL Mechanism")
+        ->withDescription("The SASL mechanism to use for authentication. Corresponds to Kafka's 'sasl.mechanism' property.")
+        ->withDefaultValue<std::string>(toString(SASLMechanismOption::GSSAPI))
+        ->withAllowableValues<std::string>(SASLMechanismOption::values())
+        ->isRequired(true)
+        ->build());
+const core::Property KafkaProcessorBase::Username(
+    core::PropertyBuilder::createProperty("Username")
+        ->withDescription("The username when the SASL Mechanism is sasl_plaintext")
+        ->build());
+const core::Property KafkaProcessorBase::Password(
+    core::PropertyBuilder::createProperty("Password")
+        ->withDescription("The password for the given username when the SASL Mechanism is sasl_plaintext")
+        ->build());
+
+std::optional<utils::SSL_data> KafkaProcessorBase::getSslData(core::ProcessContext& context) const {
+  std::string ssl_service_name;
+  if (context.getProperty(SSLContextService.getName(), ssl_service_name) && !ssl_service_name.empty()) {
+    std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(ssl_service_name);
+    if (service) {
+      auto ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
+      utils::SSL_data ssl_data;
+      ssl_data.ca_loc = ssl_service->getCACertificate();
+      ssl_data.cert_loc = ssl_service->getCertificateFile();
+      ssl_data.key_loc = ssl_service->getPrivateKeyFile();
+      ssl_data.key_pw = ssl_service->getPassphrase();
+      return ssl_data;
+    } else {
+      logger_->log_warn("SSL Context Service property is set to '%s', but the controller service could not be found.", ssl_service_name);
+      return std::nullopt;
+    }
+  } else if (security_protocol_ == SecurityProtocolOption::SSL || security_protocol_ == SecurityProtocolOption::SASL_SSL) {
+    logger_->log_warn("Security protocol is set to %s, but no valid SSL Context Service property is set.", security_protocol_.toString());
+  }
+
+  return std::nullopt;
+}
+
+void KafkaProcessorBase::setKafkaAuthenticationParameters(core::ProcessContext& context, gsl::not_null<rd_kafka_conf_t*> config) {
+  security_protocol_ = utils::getRequiredPropertyOrThrow<SecurityProtocolOption>(context, SecurityProtocol.getName());
+  utils::setKafkaConfigurationField(*config, "security.protocol", security_protocol_.toString());
+  logger_->log_debug("Kafka security.protocol [%s]", security_protocol_.toString());
+  if (security_protocol_ == SecurityProtocolOption::SSL || security_protocol_ == SecurityProtocolOption::SASL_SSL) {
+    auto ssl_data = getSslData(context);
+    if (ssl_data) {
+      if (ssl_data->ca_loc.empty() && ssl_data->cert_loc.empty() && ssl_data->key_loc.empty() && ssl_data->key_pw.empty()) {
+        logger_->log_warn("Security protocol is set to %s, but no valid security parameters are set in the properties or in the SSL Context Service.", security_protocol_.toString());
+      } else {
+        utils::setKafkaConfigurationField(*config, "ssl.ca.location", ssl_data->ca_loc);
+        logger_->log_debug("Kafka ssl.ca.location [%s]", ssl_data->ca_loc);
+        utils::setKafkaConfigurationField(*config, "ssl.certificate.location", ssl_data->cert_loc);
+        logger_->log_debug("Kafka ssl.certificate.location [%s]", ssl_data->cert_loc);
+        utils::setKafkaConfigurationField(*config, "ssl.key.location", ssl_data->key_loc);
+        logger_->log_debug("Kafka ssl.key.location [%s]", ssl_data->key_loc);
+        utils::setKafkaConfigurationField(*config, "ssl.key.password", ssl_data->key_pw);
+        logger_->log_debug("Kafka ssl.key.password was set");
+      }
+    }
+  }
+
+  auto sasl_mechanism = utils::getRequiredPropertyOrThrow<SASLMechanismOption>(context, SASLMechanism.getName());
+  utils::setKafkaConfigurationField(*config, "sasl.mechanism", sasl_mechanism.toString());
+  logger_->log_debug("Kafka sasl.mechanism [%s]", sasl_mechanism.toString());
+
+  auto setKafkaConfigIfNotEmpty = [this, &context, config](const std::string& property_name, const std::string& kafka_config_name, bool log_value = true) {
+    std::string value;
+    if (context.getProperty(property_name, value) && !value.empty()) {
+      utils::setKafkaConfigurationField(*config, kafka_config_name, value);
+      if (log_value) {
+        logger_->log_debug("Kafka %s [%s]", kafka_config_name, value);
+      } else {
+        logger_->log_debug("Kafka %s was set", kafka_config_name);
+      }
+    }
+  };
+
+  setKafkaConfigIfNotEmpty(KerberosServiceName.getName(), "sasl.kerberos.service.name");
+  setKafkaConfigIfNotEmpty(KerberosPrincipal.getName(), "sasl.kerberos.principal");
+  setKafkaConfigIfNotEmpty(KerberosKeytabPath.getName(), "sasl.kerberos.keytab");
+  setKafkaConfigIfNotEmpty(Username.getName(), "sasl.username");
+  setKafkaConfigIfNotEmpty(Password.getName(), "sasl.password", false);
+}
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/librdkafka/KafkaProcessorBase.h b/extensions/librdkafka/KafkaProcessorBase.h
new file mode 100644
index 0000000..6879de2
--- /dev/null
+++ b/extensions/librdkafka/KafkaProcessorBase.h
@@ -0,0 +1,69 @@
+/**
+ * @file KafkaProcessorBase.h
+ * KafkaProcessorBase class declaration
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <optional>
+#include <memory>
+#include <string>
+
+#include "core/Processor.h"
+#include "rdkafka_utils.h"
+#include "utils/Enum.h"
+
+namespace org::apache::nifi::minifi::processors {
+
+// PublishKafka Class
+class KafkaProcessorBase : public core::Processor {
+ public:
+  EXTENSIONAPI static const core::Property SSLContextService;
+  EXTENSIONAPI static const core::Property SecurityProtocol;
+  EXTENSIONAPI static const core::Property KerberosServiceName;
+  EXTENSIONAPI static const core::Property KerberosPrincipal;
+  EXTENSIONAPI static const core::Property KerberosKeytabPath;
+  EXTENSIONAPI static const core::Property SASLMechanism;
+  EXTENSIONAPI static const core::Property Username;
+  EXTENSIONAPI static const core::Property Password;
+
+  SMART_ENUM(SecurityProtocolOption,
+    (PLAINTEXT, "plaintext"),
+    (SSL, "ssl"),
+    (SASL_PLAIN, "sasl_plaintext"),
+    (SASL_SSL, "sasl_ssl")
+  )
+
+  SMART_ENUM(SASLMechanismOption,
+    (GSSAPI, "GSSAPI"),
+    (PLAIN, "PLAIN")
+  )
+
+  KafkaProcessorBase(const std::string& name, const utils::Identifier& uuid, std::shared_ptr<core::logging::Logger> logger)
+      : core::Processor(name, uuid),
+        logger_(logger) {
+  }
+
+ protected:
+  virtual std::optional<utils::SSL_data> getSslData(core::ProcessContext& context) const;
+  void setKafkaAuthenticationParameters(core::ProcessContext& context, gsl::not_null<rd_kafka_conf_t*> config);
+
+  SecurityProtocolOption security_protocol_;
+  std::shared_ptr<core::logging::Logger> logger_;
+};
+
+}  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/librdkafka/PublishKafka.cpp b/extensions/librdkafka/PublishKafka.cpp
index cae955f..f021bed 100644
--- a/extensions/librdkafka/PublishKafka.cpp
+++ b/extensions/librdkafka/PublishKafka.cpp
@@ -41,23 +41,9 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-#define COMPRESSION_CODEC_NONE "none"
-#define COMPRESSION_CODEC_GZIP "gzip"
-#define COMPRESSION_CODEC_SNAPPY "snappy"
-#define ROUND_ROBIN_PARTITIONING "Round Robin"
-#define RANDOM_PARTITIONING "Random Robin"
-#define USER_DEFINED_PARTITIONING "User-Defined"
-#define DELIVERY_REPLICATED "all"
-#define DELIVERY_ONE_NODE "1"
-#define DELIVERY_BEST_EFFORT "0"
-#define SECURITY_PROTOCOL_PLAINTEXT "plaintext"
-#define SECURITY_PROTOCOL_SSL "ssl"
-#define KAFKA_KEY_ATTRIBUTE "kafka.key"
-
 const core::Property PublishKafka::SeedBrokers(
     core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
-
 const core::Property PublishKafka::Topic(
     core::PropertyBuilder::createProperty("Topic Name")->withDescription("The Kafka Topic of interest")
         ->isRequired(true)->supportsExpressionLanguage(true)->build());
@@ -68,7 +54,6 @@ const core::Property PublishKafka::DeliveryGuarantee(
                                                                                  "-1 or all (block until message is committed by all in sync replicas) "
                                                                                  "or any concrete number of nodes.")
         ->isRequired(false)->supportsExpressionLanguage(true)->withDefaultValue(DELIVERY_ONE_NODE)->build());
-
 const core::Property PublishKafka::MaxMessageSize(
     core::PropertyBuilder::createProperty("Max Request Size")->withDescription("Maximum Kafka protocol request message size")
         ->isRequired(false)->build());
@@ -118,33 +103,14 @@ const core::Property PublishKafka::CompressCodec(
         ->withAllowableValues<std::string>({COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY})
         ->withDescription("compression codec to use for compressing message sets")
         ->build());
-
 const core::Property PublishKafka::MaxFlowSegSize(
     core::PropertyBuilder::createProperty("Max Flow Segment Size")->withDescription("Maximum flow content payload segment size for the kafka record. 0 B means unlimited.")
         ->isRequired(false)->withDefaultValue<core::DataSizeValue>("0 B")->build());
-const core::Property PublishKafka::SecurityProtocol(
-        core::PropertyBuilder::createProperty("Security Protocol")
-        ->withDescription("Protocol used to communicate with brokers")
-        ->withDefaultValue<std::string>(SECURITY_PROTOCOL_PLAINTEXT)
-        ->withAllowableValues<std::string>({SECURITY_PROTOCOL_PLAINTEXT, SECURITY_PROTOCOL_SSL})
-        ->isRequired(true)
-        ->build());
-
-const core::Property PublishKafka::SSLContextService(
-    core::PropertyBuilder::createProperty("SSL Context Service")
-        ->withDescription("SSL Context Service Name")
-        ->asType<minifi::controllers::SSLContextService>()
-        ->build());
 
 const core::Property PublishKafka::SecurityCA("Security CA", "DEPRECATED in favor of SSL Context Service. File or directory path to CA certificate(s) for verifying the broker's key", "");
 const core::Property PublishKafka::SecurityCert("Security Cert", "DEPRECATED in favor of SSL Context Service.Path to client's public key (PEM) used for authentication", "");
 const core::Property PublishKafka::SecurityPrivateKey("Security Private Key", "DEPRECATED in favor of SSL Context Service.Path to client's private key (PEM) used for authentication", "");
 const core::Property PublishKafka::SecurityPrivateKeyPassWord("Security Pass Phrase", "DEPRECATED in favor of SSL Context Service.Private key passphrase", "");
-const core::Property PublishKafka::KerberosServiceName("Kerberos Service Name", "Kerberos Service Name", "");
-const core::Property PublishKafka::KerberosPrincipal("Kerberos Principal", "Keberos Principal", "");
-const core::Property PublishKafka::KerberosKeytabPath("Kerberos Keytab Path",
-                                                "The path to the location on the local filesystem where the kerberos keytab is located. Read permission on the file is required.", "");
-
 const core::Property PublishKafka::KafkaKey(
     core::PropertyBuilder::createProperty("Kafka Key")
         ->withDescription("The key to use for the message. If not specified, the UUID of the flow file is used as the message key.")
@@ -475,41 +441,44 @@ void messageDeliveryCallback(rd_kafka_t* rk, const rd_kafka_message_t* rkmessage
 
 void PublishKafka::initialize() {
   // Set the supported properties
-  std::set<core::Property> properties;
-  properties.insert(SeedBrokers);
-  properties.insert(Topic);
-  properties.insert(DeliveryGuarantee);
-  properties.insert(MaxMessageSize);
-  properties.insert(RequestTimeOut);
-  properties.insert(MessageTimeOut);
-  properties.insert(ClientName);
-  properties.insert(AttributeNameRegex);
-  properties.insert(BatchSize);
-  properties.insert(TargetBatchPayloadSize);
-  properties.insert(QueueBufferMaxTime);
-  properties.insert(QueueBufferMaxSize);
-  properties.insert(QueueBufferMaxMessage);
-  properties.insert(CompressCodec);
-  properties.insert(MaxFlowSegSize);
-  properties.insert(SecurityProtocol);
-  properties.insert(SSLContextService);
-  properties.insert(SecurityCA);
-  properties.insert(SecurityCert);
-  properties.insert(SecurityPrivateKey);
-  properties.insert(SecurityPrivateKeyPassWord);
-  properties.insert(KerberosServiceName);
-  properties.insert(KerberosPrincipal);
-  properties.insert(KerberosKeytabPath);
-  properties.insert(KafkaKey);
-  properties.insert(MessageKeyField);
-  properties.insert(DebugContexts);
-  properties.insert(FailEmptyFlowFiles);
-  setSupportedProperties(properties);
+  setSupportedProperties({
+    SeedBrokers,
+    Topic,
+    DeliveryGuarantee,
+    MaxMessageSize,
+    RequestTimeOut,
+    MessageTimeOut,
+    ClientName,
+    AttributeNameRegex,
+    BatchSize,
+    TargetBatchPayloadSize,
+    QueueBufferMaxTime,
+    QueueBufferMaxSize,
+    QueueBufferMaxMessage,
+    CompressCodec,
+    MaxFlowSegSize,
+    SecurityProtocol,
+    SSLContextService,
+    SecurityCA,
+    SecurityCert,
+    SecurityPrivateKey,
+    SecurityPrivateKeyPassWord,
+    KerberosServiceName,
+    KerberosPrincipal,
+    KerberosKeytabPath,
+    KafkaKey,
+    MessageKeyField,
+    DebugContexts,
+    FailEmptyFlowFiles,
+    SASLMechanism,
+    Username,
+    Password
+  });
   // Set the supported relationships
-  std::set<core::Relationship> relationships;
-  relationships.insert(Failure);
-  relationships.insert(Success);
-  setSupportedRelationships(relationships);
+  setSupportedRelationships({
+    Success,
+    Failure
+  });
 }
 
 void PublishKafka::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory>& /*sessionFactory*/) {
@@ -622,33 +591,7 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
-  value = "";
-  if (context->getProperty(KerberosServiceName.getName(), value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "sasl.kerberos.service.name", value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: sasl.kerberos.service.name [%s]", value);
-    if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-    }
-  }
-  value = "";
-  if (context->getProperty(KerberosPrincipal.getName(), value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "sasl.kerberos.principal", value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: sasl.kerberos.principal [%s]", value);
-    if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-    }
-  }
-  value = "";
-  if (context->getProperty(KerberosKeytabPath.getName(), value) && !value.empty()) {
-    result = rd_kafka_conf_set(conf_.get(), "sasl.kerberos.keytab", value.c_str(), errstr.data(), errstr.size());
-    logger_->log_debug("PublishKafka: sasl.kerberos.keytab [%s]", value);
-    if (result != RD_KAFKA_CONF_OK) {
-      auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-    }
-  }
+
   value = "";
   if (context->getProperty(MaxMessageSize.getName(), value) && !value.empty()) {
     result = rd_kafka_conf_set(conf_.get(), "message.max.bytes", value.c_str(), errstr.data(), errstr.size());
@@ -709,97 +652,8 @@ bool PublishKafka::configureNewConnection(const std::shared_ptr<core::ProcessCon
       throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
     }
   }
-  value = "";
-  if (context->getProperty(SecurityProtocol.getName(), value) && !value.empty()) {
-    if (value == SECURITY_PROTOCOL_SSL) {
-      result = rd_kafka_conf_set(conf_.get(), "security.protocol", value.c_str(), errstr.data(), errstr.size());
-      logger_->log_debug("PublishKafka: security.protocol [%s]", value);
-      if (result != RD_KAFKA_CONF_OK) {
-        auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-        throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-      }
-
-      std::shared_ptr<minifi::controllers::SSLContextService> ssl_service;
-      if (context->getProperty(SSLContextService.getName(), value) && !value.empty()) {
-        std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
-        if (service) {
-          ssl_service = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
-        } else {
-          logger_->log_warn("SSL Context Service property is set to '%s', but the controller service could not be found.", value);
-        }
-      }
-
-      std::string security_ca;
-      if (ssl_service) {
-        security_ca = ssl_service->getCACertificate();
-      } else {
-        context->getProperty(SecurityCA.getName(), security_ca);
-      }
-
-      std::string security_cert;
-      if (ssl_service) {
-        security_cert = ssl_service->getCertificateFile();
-      } else {
-        context->getProperty(SecurityCert.getName(), security_cert);
-      }
-
-      std::string security_private_key;
-      if (ssl_service) {
-        security_private_key = ssl_service->getPrivateKeyFile();
-      } else {
-        context->getProperty(SecurityPrivateKey.getName(), security_private_key);
-      }
-
-      std::string security_private_key_password;
-      if (ssl_service) {
-        security_private_key_password = ssl_service->getPassphrase();
-      } else {
-        context->getProperty(SecurityPrivateKeyPassWord.getName(), security_private_key_password);
-      }
-
-      if (!security_ca.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.ca.location", security_ca.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.ca.location [%s]", security_ca);
-        if (result != RD_KAFKA_CONF_OK) {
-          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-        }
-      }
-      if (!security_cert.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.certificate.location", security_cert.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.certificate.location [%s]", security_cert);
-        if (result != RD_KAFKA_CONF_OK) {
-          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-        }
-      }
-      if (!security_private_key.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.key.location", security_private_key.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.key.location [%s]", security_private_key);
-        if (result != RD_KAFKA_CONF_OK) {
-          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-        }
-      }
-      if (!security_private_key_password.empty()) {
-        result = rd_kafka_conf_set(conf_.get(), "ssl.key.password", security_private_key_password.c_str(), errstr.data(), errstr.size());
-        logger_->log_debug("PublishKafka: ssl.key.password [%s]", security_private_key_password);
-        if (result != RD_KAFKA_CONF_OK) {
-          auto error_msg = utils::StringUtils::join_pack(PREFIX_ERROR_MSG, errstr.data());
-          throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-        }
-      }
 
-      if (security_ca.empty() && security_cert.empty() && security_private_key.empty() && security_private_key_password.empty()) {
-        logger_->log_warn("Security protocol is set to %s, but no valid security parameters are set in the properties or in the SSL Context Service.", SECURITY_PROTOCOL_SSL);
-      }
-    } else if (value == SECURITY_PROTOCOL_PLAINTEXT) {
-      // Do nothing
-    } else {
-      auto error_msg = utils::StringUtils::join_pack("PublishKafka: unknown Security Protocol: ", value);
-      throw Exception(PROCESS_SCHEDULE_EXCEPTION, error_msg);
-    }
-  }
+  setKafkaAuthenticationParameters(*context, gsl::make_not_null(conf_.get()));
 
   // Add all of the dynamic properties as librdkafka configurations
   const auto &dynamic_prop_keys = context->getDynamicPropertyKeys();
@@ -919,6 +773,19 @@ bool PublishKafka::createNewTopic(const std::shared_ptr<core::ProcessContext> &c
   return true;
 }
 
+std::optional<utils::SSL_data> PublishKafka::getSslData(core::ProcessContext& context) const {
+  if (auto result = KafkaProcessorBase::getSslData(context); result) {
+    return result;
+  }
+
+  utils::SSL_data ssl_data;
+  context.getProperty(SecurityCA.getName(), ssl_data.ca_loc);
+  context.getProperty(SecurityCert.getName(), ssl_data.cert_loc);
+  context.getProperty(SecurityPrivateKey.getName(), ssl_data.key_loc);
+  context.getProperty(SecurityPrivateKeyPassWord.getName(), ssl_data.key_pw);
+  return ssl_data;
+}
+
 void PublishKafka::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
   // Check whether we have been interrupted
   if (interrupted_) {
diff --git a/extensions/librdkafka/PublishKafka.h b/extensions/librdkafka/PublishKafka.h
index c2a8197..3108c54 100644
--- a/extensions/librdkafka/PublishKafka.h
+++ b/extensions/librdkafka/PublishKafka.h
@@ -32,9 +32,9 @@
 #include <vector>
 #include <regex>
 
+#include "KafkaProcessorBase.h"
 #include "utils/GeneralUtils.h"
 #include "FlowFileRecord.h"
-#include "core/Processor.h"
 #include "core/ProcessSession.h"
 #include "core/Core.h"
 #include "core/Property.h"
@@ -51,7 +51,7 @@ namespace minifi {
 namespace processors {
 
 // PublishKafka Class
-class PublishKafka : public core::Processor {
+class PublishKafka : public KafkaProcessorBase {
  public:
   static constexpr char const* ProcessorName = "PublishKafka";
 
@@ -71,15 +71,10 @@ class PublishKafka : public core::Processor {
   EXTENSIONAPI static const core::Property QueueBufferMaxMessage;
   EXTENSIONAPI static const core::Property CompressCodec;
   EXTENSIONAPI static const core::Property MaxFlowSegSize;
-  EXTENSIONAPI static const core::Property SSLContextService;
-  EXTENSIONAPI static const core::Property SecurityProtocol;
   EXTENSIONAPI static const core::Property SecurityCA;
   EXTENSIONAPI static const core::Property SecurityCert;
   EXTENSIONAPI static const core::Property SecurityPrivateKey;
   EXTENSIONAPI static const core::Property SecurityPrivateKeyPassWord;
-  EXTENSIONAPI static const core::Property KerberosServiceName;
-  EXTENSIONAPI static const core::Property KerberosPrincipal;
-  EXTENSIONAPI static const core::Property KerberosKeytabPath;
   EXTENSIONAPI static const core::Property KafkaKey;
   EXTENSIONAPI static const core::Property MessageKeyField;
   EXTENSIONAPI static const core::Property DebugContexts;
@@ -89,8 +84,19 @@ class PublishKafka : public core::Processor {
   EXTENSIONAPI static const core::Relationship Failure;
   EXTENSIONAPI static const core::Relationship Success;
 
+  static constexpr const char* COMPRESSION_CODEC_NONE = "none";
+  static constexpr const char* COMPRESSION_CODEC_GZIP = "gzip";
+  static constexpr const char* COMPRESSION_CODEC_SNAPPY = "snappy";
+  static constexpr const char* ROUND_ROBIN_PARTITIONING = "Round Robin";
+  static constexpr const char* RANDOM_PARTITIONING = "Random Robin";
+  static constexpr const char* USER_DEFINED_PARTITIONING = "User-Defined";
+  static constexpr const char* DELIVERY_REPLICATED = "all";
+  static constexpr const char* DELIVERY_ONE_NODE = "1";
+  static constexpr const char* DELIVERY_BEST_EFFORT = "0";
+  static constexpr const char* KAFKA_KEY_ATTRIBUTE = "kafka.key";
+
   explicit PublishKafka(const std::string& name, const utils::Identifier& uuid = {})
-      : core::Processor(name, uuid) {
+      : KafkaProcessorBase(name, uuid, core::logging::LoggerFactory<PublishKafka>::getLogger()) {
   }
 
   ~PublishKafka() override = default;
@@ -113,14 +119,13 @@ class PublishKafka : public core::Processor {
  protected:
   bool configureNewConnection(const std::shared_ptr<core::ProcessContext> &context);
   bool createNewTopic(const std::shared_ptr<core::ProcessContext> &context, const std::string& topic_name, const std::shared_ptr<core::FlowFile>& flow_file);
+  std::optional<utils::SSL_data> getSslData(core::ProcessContext& context) const override;
 
  private:
   core::annotation::Input getInputRequirement() const override {
     return core::annotation::Input::INPUT_REQUIRED;
   }
 
-  std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<PublishKafka>::getLogger()};
-
   KafkaConnectionKey key_;
   std::unique_ptr<KafkaConnection> conn_;
   std::mutex connection_mutex_;
diff --git a/libminifi/include/utils/ProcessorConfigUtils.h b/libminifi/include/utils/ProcessorConfigUtils.h
index acf7fd5..9f28de8 100644
--- a/libminifi/include/utils/ProcessorConfigUtils.h
+++ b/libminifi/include/utils/ProcessorConfigUtils.h
@@ -30,11 +30,19 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, const std::string& property_name);
-std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessContext* context, const std::string& property_name);
-std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext* context, const std::string& property_name);
-bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const std::string& property_name);
-std::chrono::milliseconds parseTimePropertyMSOrThrow(core::ProcessContext* context, const std::string& property_name);
+template<typename PropertyType = std::string>
+PropertyType getRequiredPropertyOrThrow(const core::ProcessContext& context, const std::string& property_name) {
+  PropertyType value;
+  if (!context.getProperty(property_name, value)) {
+    throw std::runtime_error(property_name + " property missing or invalid");
+  }
+  return value;
+}
+
+std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessContext& context, const std::string& property_name);
+std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext& context, const std::string& property_name);
+bool parseBooleanPropertyOrThrow(const core::ProcessContext& context, const std::string& property_name);
+std::chrono::milliseconds parseTimePropertyMSOrThrow(const core::ProcessContext& context, const std::string& property_name);
 std::optional<uint64_t> getOptionalUintProperty(const core::ProcessContext& context, const std::string& property_name);
 std::string parsePropertyWithAllowableValuesOrThrow(const core::ProcessContext& context, const std::string& property_name, const std::set<std::string>& allowable_values);
 
diff --git a/libminifi/src/utils/ProcessorConfigUtils.cpp b/libminifi/src/utils/ProcessorConfigUtils.cpp
index a423d87..93b4ec8 100644
--- a/libminifi/src/utils/ProcessorConfigUtils.cpp
+++ b/libminifi/src/utils/ProcessorConfigUtils.cpp
@@ -28,25 +28,17 @@ namespace nifi {
 namespace minifi {
 namespace utils {
 
-std::string getRequiredPropertyOrThrow(const core::ProcessContext* context, const std::string& property_name) {
-  std::string value;
-  if (!context->getProperty(property_name, value)) {
-    throw std::runtime_error(property_name + " property missing or invalid");
-  }
-  return value;
-}
-
-std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessContext* context, const std::string& property_name) {
+std::vector<std::string> listFromCommaSeparatedProperty(const core::ProcessContext& context, const std::string& property_name) {
   std::string property_string;
-  context->getProperty(property_name, property_string);
+  context.getProperty(property_name, property_string);
   return utils::StringUtils::splitAndTrim(property_string, ",");
 }
 
-std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext* context, const std::string& property_name) {
+std::vector<std::string> listFromRequiredCommaSeparatedProperty(const core::ProcessContext& context, const std::string& property_name) {
   return utils::StringUtils::splitAndTrim(getRequiredPropertyOrThrow(context, property_name), ",");
 }
 
-bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const std::string& property_name) {
+bool parseBooleanPropertyOrThrow(const core::ProcessContext& context, const std::string& property_name) {
   const std::string value_str = getRequiredPropertyOrThrow(context, property_name);
   const auto maybe_value = utils::StringUtils::toBool(value_str);
   if (!maybe_value) {
@@ -55,7 +47,7 @@ bool parseBooleanPropertyOrThrow(core::ProcessContext* context, const std::strin
   return maybe_value.value();
 }
 
-std::chrono::milliseconds parseTimePropertyMSOrThrow(core::ProcessContext* context, const std::string& property_name) {
+std::chrono::milliseconds parseTimePropertyMSOrThrow(const core::ProcessContext& context, const std::string& property_name) {
   core::TimeUnit unit;
   uint64_t time_value_ms;
   const std::string value_str = getRequiredPropertyOrThrow(context, property_name);