You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/11/30 18:23:29 UTC

[nifi] branch main updated: NIFI-10901 - PublishKafka headers not sent in ProducerRecord (#6731)

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 282c56b5ce NIFI-10901 - PublishKafka headers not sent in ProducerRecord (#6731)
282c56b5ce is described below

commit 282c56b5ceca47caf711898c5929e35b964bb7fe
Author: greyp9 <gr...@users.noreply.github.com>
AuthorDate: Wed Nov 30 13:23:23 2022 -0500

    NIFI-10901 - PublishKafka headers not sent in ProducerRecord (#6731)
---
 .../processors/kafka/pubsub/PublisherLease.java    |  3 +-
 .../additionalDetails.html                         |  2 +-
 .../pubsub/TestPublishKafkaMockParameterized.java  | 97 +++++++++-------------
 ...> TestPublishKafkaRecordMockParameterized.java} | 49 +++++++++--
 .../Publish/parameterized/flowfileInput1.json      |  8 ++
 .../Publish/parameterized/flowfileInputA.json      | 12 +++
 .../Publish/parameterized/kafkaOutput1A.json       | 18 ++++
 .../Publish/parameterized/kafkaOutput1B.json       | 18 ++++
 .../Publish/parameterized/kafkaOutputA1.json       | 22 +++++
 .../Publish/parameterized/kafkaOutputA2.json       | 22 +++++
 .../parameterized/flowfileInputDoc1V.json          |  8 ++
 .../parameterized/flowfileInputDoc1W.json          | 15 ++++
 .../parameterized/flowfileInputDoc2W.json          | 15 ++++
 .../parameterized/kafkaOutputDoc1V.json            | 21 +++++
 .../parameterized/kafkaOutputDoc1W.json            | 18 ++++
 .../parameterized/kafkaOutputDoc2W.json            | 16 ++++
 16 files changed, 275 insertions(+), 69 deletions(-)

diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 0949e537b6..da37db0319 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -431,7 +431,8 @@ public class PublisherLease implements Closeable {
     }
 
     protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker, final Integer partition) {
-        publish(flowFile, Collections.emptyList(), messageKey, messageContent, topic, tracker, partition);
+        final List<Header> headers = toHeaders(flowFile, Collections.emptyMap());
+        publish(flowFile, headers, messageKey, messageContent, topic, tracker, partition);
     }
 
     protected void publish(final FlowFile flowFile, final List<Header> headers, final byte[] messageKey, final byte[] messageContent,
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html
index ce87abc73d..e125b0f067 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html
@@ -375,7 +375,7 @@
             <table border="thin">
                 <tr>
                     <th>Record Key</th>
-                    <td><code>Acme Accounts</code></td>
+                    <td><code>Acme Holdings</code></td>
                 </tr>
                 <tr>
                     <th>Record Value</th>
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java
index 4e97a6da84..b15f5d55a1 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java
@@ -31,17 +31,12 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
 import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.nifi.json.JsonRecordSetWriter;
-import org.apache.nifi.json.JsonTreeReader;
-import org.apache.nifi.kafka.shared.property.PublishStrategy;
 import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -57,12 +52,15 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
@@ -79,30 +77,29 @@ public class TestPublishKafkaMockParameterized {
 
     public static Stream<Arguments> testCaseParametersProvider() {
         return Stream.of(
-                arguments("PublishRecord/parameterized/flowfileInput1.json",
-                        "account", ".*A.", getAttributes(), PublishStrategy.USE_VALUE,
-                        "PublishRecord/parameterized/kafkaOutput1V.json"),
-                arguments("PublishRecord/parameterized/flowfileInput1.json",
-                        "account", ".*B.", getAttributes(), PublishStrategy.USE_WRAPPER,
-                        "PublishRecord/parameterized/kafkaOutput1W.json"),
-                arguments("PublishRecord/parameterized/flowfileInputA.json",
-                        "key", ".*1", getAttributes(), PublishStrategy.USE_VALUE,
-                        "PublishRecord/parameterized/kafkaOutputAV.json"),
-                arguments("PublishRecord/parameterized/flowfileInputA.json",
-                        "key", ".*2", getAttributes(), PublishStrategy.USE_WRAPPER,
-                        "PublishRecord/parameterized/kafkaOutputAW.json")
+                arguments("Publish/parameterized/flowfileInput1.json",
+                        "key1A", ".*A.", getAttributes(),
+                        "Publish/parameterized/kafkaOutput1A.json"),
+                arguments("Publish/parameterized/flowfileInput1.json",
+                        "key1B", ".*B.", getAttributes(),
+                        "Publish/parameterized/kafkaOutput1B.json"),
+                arguments("Publish/parameterized/flowfileInputA.json",
+                        "keyA1", ".*1", getAttributes(),
+                        "Publish/parameterized/kafkaOutputA1.json"),
+                arguments("Publish/parameterized/flowfileInputA.json",
+                        "keyA2", ".*2", getAttributes(),
+                        "Publish/parameterized/kafkaOutputA2.json")
         );
     }
 
     @ParameterizedTest
     @MethodSource("testCaseParametersProvider")
-    public void testPublishKafkaRecord(final String flowfileInputResource,
-                                       final String messageKeyField,
-                                       final String attributeNameRegex,
-                                       final Map<String, String> attributes,
-                                       final PublishStrategy publishStrategy,
-                                       final String kafkaRecordExpectedOutputResource)
-            throws IOException, InitializationException {
+    public void testPublishKafka(final String flowfileInputResource,
+                                 final String messageKey,
+                                 final String attributeNameRegex,
+                                 final Map<String, String> attributes,
+                                 final String kafkaRecordExpectedOutputResource)
+            throws IOException {
         final byte[] flowfileData = IOUtils.toByteArray(Objects.requireNonNull(
                 getClass().getClassLoader().getResource(flowfileInputResource)));
         logger.trace(new String(flowfileData, UTF_8));
@@ -114,12 +111,11 @@ public class TestPublishKafkaMockParameterized {
         final TestRunner runner = getTestRunner(producedRecords);
         runner.setProperty("topic", "test-topic");
         runner.setProperty("attribute-name-regex", attributeNameRegex);
-        runner.setProperty("message-key-field", messageKeyField);
-        runner.setProperty("publish-strategy", publishStrategy.name());
+        runner.setProperty("kafka-key", messageKey);
         runner.enqueue(flowFile);
         runner.run(1);
         // verify results
-        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
+        runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_SUCCESS, 1);
         assertEquals(1, producedRecords.size());
         final ProducerRecord<byte[], byte[]> kafkaRecord = producedRecords.iterator().next();
         final DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter()
@@ -151,7 +147,7 @@ public class TestPublishKafkaMockParameterized {
         public void serialize(final RecordHeader recordHeader, final JsonGenerator jsonGenerator,
                               final SerializerProvider serializerProvider) throws IOException {
             jsonGenerator.writeStartObject();
-            jsonGenerator.writeObjectField("RecordHeader-key",
+            jsonGenerator.writeStringField("RecordHeader-key",
                     (recordHeader.key() == null) ? null : recordHeader.key());
             jsonGenerator.writeObjectField("RecordHeader-value",
                     (recordHeader.value() == null) ? null : new String(recordHeader.value(), StandardCharsets.UTF_8));
@@ -174,11 +170,15 @@ public class TestPublishKafkaMockParameterized {
         public void serialize(ProducerRecord<byte[], byte[]> producerRecord, JsonGenerator jsonGenerator,
                               SerializerProvider serializerProvider) throws IOException {
             jsonGenerator.writeStartObject();
-            jsonGenerator.writeObjectField("ProducerRecord-key",
-                    (producerRecord.key() == null) ? null : objectMapper.readTree(producerRecord.key()));
+            jsonGenerator.writeStringField("ProducerRecord-key",
+                    (producerRecord.key() == null) ? null : new String(producerRecord.key(), StandardCharsets.UTF_8));
             jsonGenerator.writeObjectField("ProducerRecord-value",
                     (producerRecord.value() == null) ? null : objectMapper.readTree(producerRecord.value()));
-            jsonGenerator.writeObjectField("ProducerRecord-headers", producerRecord.headers());
+            final List<Header> headers = new ArrayList<>();
+            producerRecord.headers().forEach(headers::add);
+            final List<Header> headersSorted = headers.stream()
+                    .sorted(Comparator.comparing(Header::key)).collect(Collectors.toList());
+            jsonGenerator.writeObjectField("ProducerRecord-headers", headersSorted);
             jsonGenerator.writeEndObject();
         }
     }
@@ -192,15 +192,8 @@ public class TestPublishKafkaMockParameterized {
         return attributes;
     }
 
-    private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords)
-            throws InitializationException {
-        final String readerId = "record-reader";
-        final RecordReaderFactory readerService = new JsonTreeReader();
-        final String writerId = "record-writer";
-        final RecordSetWriterFactory writerService = new JsonRecordSetWriter();
-        final String keyWriterId = "record-key-writer";
-        final RecordSetWriterFactory keyWriterService = new JsonRecordSetWriter();
-        final PublishKafkaRecord_2_6 processor = new PublishKafkaRecord_2_6() {
+    private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords) {
+        final PublishKafka_2_6 processor = new PublishKafka_2_6() {
             @Override
             protected PublisherPool createPublisherPool(final ProcessContext context) {
                 return getPublisherPool(producedRecords, context);
@@ -208,15 +201,6 @@ public class TestPublishKafkaMockParameterized {
         };
         final TestRunner runner = TestRunners.newTestRunner(processor);
         runner.setValidateExpressionUsage(false);
-        runner.addControllerService(readerId, readerService);
-        runner.enableControllerService(readerService);
-        runner.setProperty(readerId, readerId);
-        runner.addControllerService(writerId, writerService);
-        runner.enableControllerService(writerService);
-        runner.setProperty(writerId, writerId);
-        runner.addControllerService(keyWriterId, keyWriterService);
-        runner.enableControllerService(keyWriterService);
-        runner.setProperty(keyWriterId, keyWriterId);
         return runner;
     }
 
@@ -229,10 +213,8 @@ public class TestPublishKafkaMockParameterized {
         final boolean useTransactions = context.getProperty("use-transactions").asBoolean();
         final String transactionalIdPrefix = context.getProperty("transactional-id-prefix").evaluateAttributeExpressions().getValue();
         Supplier<String> transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix);
-        final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
         final String charsetName = context.getProperty("message-header-encoding").evaluateAttributeExpressions().getValue();
         final Charset charset = Charset.forName(charsetName);
-        final RecordSetWriterFactory recordKeyWriterFactory = context.getProperty("record-key-writer").asControllerService(RecordSetWriterFactory.class);
 
         return new PublisherPool(
                 Collections.emptyMap(),
@@ -243,8 +225,8 @@ public class TestPublishKafkaMockParameterized {
                 transactionalIdSupplier,
                 attributeNamePattern,
                 charset,
-                publishStrategy,
-                recordKeyWriterFactory) {
+                null,
+                null) {
             @Override
             public PublisherLease obtainPublisher() {
                 return getPublisherLease(producedRecords, context);
@@ -259,9 +241,6 @@ public class TestPublishKafkaMockParameterized {
                                              final ProcessContext context) {
         final String attributeNameRegex = context.getProperty("attribute-name-regex").getValue();
         final Pattern patternAttributeName = (attributeNameRegex == null) ? null : Pattern.compile(attributeNameRegex);
-        final RecordSetWriterFactory keyWriterFactory = context.getProperty("record-key-writer")
-                .asControllerService(RecordSetWriterFactory.class);
-        final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
 
         final Producer<byte[], byte[]> producer = mock(ProducerBB.class);
         when(producer.send(any(), any())).then(invocation -> {
@@ -280,8 +259,8 @@ public class TestPublishKafkaMockParameterized {
                 true,
                 patternAttributeName,
                 UTF_8,
-                publishStrategy,
-                keyWriterFactory) {
+                null,
+                null) {
             @Override
             protected long getTimestamp() {
                 return 1000000000000L;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecordMockParameterized.java
similarity index 86%
copy from nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java
copy to nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecordMockParameterized.java
index 4e97a6da84..f5785ac803 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecordMockParameterized.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.processors.kafka.pubsub;
 
 import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.util.DefaultIndenter;
 import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
 import com.fasterxml.jackson.databind.JsonSerializer;
@@ -72,7 +73,7 @@ import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-public class TestPublishKafkaMockParameterized {
+public class TestPublishKafkaRecordMockParameterized {
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     private final ObjectMapper mapper = getObjectMapper();
@@ -90,7 +91,17 @@ public class TestPublishKafkaMockParameterized {
                         "PublishRecord/parameterized/kafkaOutputAV.json"),
                 arguments("PublishRecord/parameterized/flowfileInputA.json",
                         "key", ".*2", getAttributes(), PublishStrategy.USE_WRAPPER,
-                        "PublishRecord/parameterized/kafkaOutputAW.json")
+                        "PublishRecord/parameterized/kafkaOutputAW.json"),
+
+                arguments("PublishRecord/parameterized/flowfileInputDoc1V.json",
+                        "account", "attribute.*", getAttributesDoc1(), PublishStrategy.USE_VALUE,
+                        "PublishRecord/parameterized/kafkaOutputDoc1V.json"),
+                arguments("PublishRecord/parameterized/flowfileInputDoc1W.json",
+                        null, null, Collections.emptyMap(), PublishStrategy.USE_WRAPPER,
+                        "PublishRecord/parameterized/kafkaOutputDoc1W.json"),
+                arguments("PublishRecord/parameterized/flowfileInputDoc2W.json",
+                        null, null, Collections.emptyMap(), PublishStrategy.USE_WRAPPER,
+                        "PublishRecord/parameterized/kafkaOutputDoc2W.json")
         );
     }
 
@@ -113,8 +124,12 @@ public class TestPublishKafkaMockParameterized {
         final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>();
         final TestRunner runner = getTestRunner(producedRecords);
         runner.setProperty("topic", "test-topic");
-        runner.setProperty("attribute-name-regex", attributeNameRegex);
-        runner.setProperty("message-key-field", messageKeyField);
+        if (attributeNameRegex != null) {
+            runner.setProperty("attribute-name-regex", attributeNameRegex);
+        }
+        if (messageKeyField != null) {
+            runner.setProperty("message-key-field", messageKeyField);
+        }
         runner.setProperty("publish-strategy", publishStrategy.name());
         runner.enqueue(flowFile);
         runner.run(1);
@@ -174,13 +189,23 @@ public class TestPublishKafkaMockParameterized {
         public void serialize(ProducerRecord<byte[], byte[]> producerRecord, JsonGenerator jsonGenerator,
                               SerializerProvider serializerProvider) throws IOException {
             jsonGenerator.writeStartObject();
-            jsonGenerator.writeObjectField("ProducerRecord-key",
-                    (producerRecord.key() == null) ? null : objectMapper.readTree(producerRecord.key()));
-            jsonGenerator.writeObjectField("ProducerRecord-value",
-                    (producerRecord.value() == null) ? null : objectMapper.readTree(producerRecord.value()));
+            serializeField(jsonGenerator, "ProducerRecord-key", producerRecord.key());
+            serializeField(jsonGenerator, "ProducerRecord-value", producerRecord.value());
             jsonGenerator.writeObjectField("ProducerRecord-headers", producerRecord.headers());
             jsonGenerator.writeEndObject();
         }
+
+        private void serializeField(final JsonGenerator jsonGenerator, final String key, final byte[] value) throws IOException {
+            if (value == null) {
+                jsonGenerator.writeObjectField(key, null);
+            } else {
+                try {
+                    jsonGenerator.writeObjectField(key, objectMapper.readTree(value));
+                } catch (final JsonParseException e) {
+                    jsonGenerator.writeStringField(key, new String(value, UTF_8));
+                }
+            }
+        }
     }
 
     private static Map<String, String> getAttributes() {
@@ -192,6 +217,14 @@ public class TestPublishKafkaMockParameterized {
         return attributes;
     }
 
+    private static Map<String, String> getAttributesDoc1() {
+        final Map<String, String> attributes = new TreeMap<>();
+        attributes.put("attributeA", "valueA");
+        attributes.put("attributeB", "valueB");
+        attributes.put("otherAttribute", "otherValue");
+        return attributes;
+    }
+
     private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords)
             throws InitializationException {
         final String readerId = "record-reader";
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInput1.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInput1.json
new file mode 100644
index 0000000000..7764158297
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInput1.json
@@ -0,0 +1,8 @@
+{
+  "address": "1234 First Street",
+  "zip": "12345",
+  "account": {
+    "name": "Acme",
+    "number": "AC1234"
+  }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInputA.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInputA.json
new file mode 100644
index 0000000000..ee19739c31
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInputA.json
@@ -0,0 +1,12 @@
+{
+  "key": {
+    "type": "person"
+  },
+  "value": {
+    "name": "Mark",
+    "number": 49
+  },
+  "headers": {
+    "headerA": "headerAValue"
+  }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1A.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1A.json
new file mode 100644
index 0000000000..1fb83cc47c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1A.json
@@ -0,0 +1,18 @@
+{
+  "ProducerRecord-key" : "key1A",
+  "ProducerRecord-value" : {
+    "address" : "1234 First Street",
+    "zip" : "12345",
+    "account" : {
+      "name" : "Acme",
+      "number" : "AC1234"
+    }
+  },
+  "ProducerRecord-headers" : [ {
+    "RecordHeader-key" : "attrKeyA1",
+    "RecordHeader-value" : "attrValueA1"
+  }, {
+    "RecordHeader-key" : "attrKeyA2",
+    "RecordHeader-value" : "attrValueA2"
+  } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1B.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1B.json
new file mode 100644
index 0000000000..49e9804e43
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1B.json
@@ -0,0 +1,18 @@
+{
+  "ProducerRecord-key" : "key1B",
+  "ProducerRecord-value" : {
+    "address" : "1234 First Street",
+    "zip" : "12345",
+    "account" : {
+      "name" : "Acme",
+      "number" : "AC1234"
+    }
+  },
+  "ProducerRecord-headers" : [ {
+    "RecordHeader-key" : "attrKeyB1",
+    "RecordHeader-value" : "attrValueB1"
+  }, {
+    "RecordHeader-key" : "attrKeyB2",
+    "RecordHeader-value" : "attrValueB2"
+  } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA1.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA1.json
new file mode 100644
index 0000000000..baa3a1a927
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA1.json
@@ -0,0 +1,22 @@
+{
+  "ProducerRecord-key" : "keyA1",
+  "ProducerRecord-value" : {
+    "key" : {
+      "type" : "person"
+    },
+    "value" : {
+      "name" : "Mark",
+      "number" : 49
+    },
+    "headers" : {
+      "headerA" : "headerAValue"
+    }
+  },
+  "ProducerRecord-headers" : [ {
+    "RecordHeader-key" : "attrKeyA1",
+    "RecordHeader-value" : "attrValueA1"
+  }, {
+    "RecordHeader-key" : "attrKeyB1",
+    "RecordHeader-value" : "attrValueB1"
+  } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA2.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA2.json
new file mode 100644
index 0000000000..ba620f70b7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA2.json
@@ -0,0 +1,22 @@
+{
+  "ProducerRecord-key" : "keyA2",
+  "ProducerRecord-value" : {
+    "key" : {
+      "type" : "person"
+    },
+    "value" : {
+      "name" : "Mark",
+      "number" : 49
+    },
+    "headers" : {
+      "headerA" : "headerAValue"
+    }
+  },
+  "ProducerRecord-headers" : [ {
+    "RecordHeader-key" : "attrKeyA2",
+    "RecordHeader-value" : "attrValueA2"
+  }, {
+    "RecordHeader-key" : "attrKeyB2",
+    "RecordHeader-value" : "attrValueB2"
+  } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1V.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1V.json
new file mode 100644
index 0000000000..7764158297
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1V.json
@@ -0,0 +1,8 @@
+{
+  "address": "1234 First Street",
+  "zip": "12345",
+  "account": {
+    "name": "Acme",
+    "number": "AC1234"
+  }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1W.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1W.json
new file mode 100644
index 0000000000..72db3c15ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1W.json
@@ -0,0 +1,15 @@
+{
+  "key": "Acme Holdings",
+  "value": {
+    "address": "1234 First Street",
+    "zip": "12345",
+    "account": {
+      "name": "Acme",
+      "number":"AC1234"
+    }
+  },
+  "headers": {
+    "accountType": "enterprise",
+    "test": "true"
+  }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc2W.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc2W.json
new file mode 100644
index 0000000000..1a2087851b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc2W.json
@@ -0,0 +1,15 @@
+{
+  "key": {
+    "accountName": "Acme Holdings",
+    "accountHolder": "John Doe",
+    "accountId": "280182830-A009"
+  },
+  "value": {
+    "address": "1234 First Street",
+    "zip": "12345",
+    "account": {
+      "name": "Acme",
+      "number":"AC1234"
+    }
+  }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1V.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1V.json
new file mode 100644
index 0000000000..146b5fdb00
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1V.json
@@ -0,0 +1,21 @@
+{
+  "ProducerRecord-key" : {
+    "name" : "Acme",
+    "number" : "AC1234"
+  },
+  "ProducerRecord-value" : {
+    "address" : "1234 First Street",
+    "zip" : "12345",
+    "account" : {
+      "name" : "Acme",
+      "number" : "AC1234"
+    }
+  },
+  "ProducerRecord-headers" : [ {
+    "RecordHeader-key" : "attributeA",
+    "RecordHeader-value" : "valueA"
+  }, {
+    "RecordHeader-key" : "attributeB",
+    "RecordHeader-value" : "valueB"
+  } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1W.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1W.json
new file mode 100644
index 0000000000..f774579d56
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1W.json
@@ -0,0 +1,18 @@
+{
+  "ProducerRecord-key" : "Acme Holdings",
+  "ProducerRecord-value" : {
+    "address" : "1234 First Street",
+    "zip" : "12345",
+    "account" : {
+      "name" : "Acme",
+      "number" : "AC1234"
+    }
+  },
+  "ProducerRecord-headers" : [ {
+    "RecordHeader-key" : "accountType",
+    "RecordHeader-value" : "enterprise"
+  }, {
+    "RecordHeader-key" : "test",
+    "RecordHeader-value" : "true"
+  } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc2W.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc2W.json
new file mode 100644
index 0000000000..a38fb4e6c1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc2W.json
@@ -0,0 +1,16 @@
+{
+  "ProducerRecord-key" : {
+    "accountName" : "Acme Holdings",
+    "accountHolder" : "John Doe",
+    "accountId" : "280182830-A009"
+  },
+  "ProducerRecord-value" : {
+    "address" : "1234 First Street",
+    "zip" : "12345",
+    "account" : {
+      "name" : "Acme",
+      "number" : "AC1234"
+    }
+  },
+  "ProducerRecord-headers" : [ ]
+}
\ No newline at end of file