You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/11/30 18:23:29 UTC
[nifi] branch main updated: NIFI-10901 - PublishKafka headers not sent in ProducerRecord (#6731)
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 282c56b5ce NIFI-10901 - PublishKafka headers not sent in ProducerRecord (#6731)
282c56b5ce is described below
commit 282c56b5ceca47caf711898c5929e35b964bb7fe
Author: greyp9 <gr...@users.noreply.github.com>
AuthorDate: Wed Nov 30 13:23:23 2022 -0500
NIFI-10901 - PublishKafka headers not sent in ProducerRecord (#6731)
---
.../processors/kafka/pubsub/PublisherLease.java | 3 +-
.../additionalDetails.html | 2 +-
.../pubsub/TestPublishKafkaMockParameterized.java | 97 +++++++++-------------
...> TestPublishKafkaRecordMockParameterized.java} | 49 +++++++++--
.../Publish/parameterized/flowfileInput1.json | 8 ++
.../Publish/parameterized/flowfileInputA.json | 12 +++
.../Publish/parameterized/kafkaOutput1A.json | 18 ++++
.../Publish/parameterized/kafkaOutput1B.json | 18 ++++
.../Publish/parameterized/kafkaOutputA1.json | 22 +++++
.../Publish/parameterized/kafkaOutputA2.json | 22 +++++
.../parameterized/flowfileInputDoc1V.json | 8 ++
.../parameterized/flowfileInputDoc1W.json | 15 ++++
.../parameterized/flowfileInputDoc2W.json | 15 ++++
.../parameterized/kafkaOutputDoc1V.json | 21 +++++
.../parameterized/kafkaOutputDoc1W.json | 18 ++++
.../parameterized/kafkaOutputDoc2W.json | 16 ++++
16 files changed, 275 insertions(+), 69 deletions(-)
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 0949e537b6..da37db0319 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -431,7 +431,8 @@ public class PublisherLease implements Closeable {
}
protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker, final Integer partition) {
- publish(flowFile, Collections.emptyList(), messageKey, messageContent, topic, tracker, partition);
+ final List<Header> headers = toHeaders(flowFile, Collections.emptyMap());
+ publish(flowFile, headers, messageKey, messageContent, topic, tracker, partition);
}
protected void publish(final FlowFile flowFile, final List<Header> headers, final byte[] messageKey, final byte[] messageContent,
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html
index ce87abc73d..e125b0f067 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_2_6/additionalDetails.html
@@ -375,7 +375,7 @@
<table border="thin">
<tr>
<th>Record Key</th>
- <td><code>Acme Accounts</code></td>
+ <td><code>Acme Holdings</code></td>
</tr>
<tr>
<th>Record Value</th>
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java
index 4e97a6da84..b15f5d55a1 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java
@@ -31,17 +31,12 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.nifi.json.JsonRecordSetWriter;
-import org.apache.nifi.json.JsonTreeReader;
-import org.apache.nifi.kafka.shared.property.PublishStrategy;
import org.apache.nifi.kafka.shared.transaction.TransactionIdSupplier;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.reporting.InitializationException;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -57,12 +52,15 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.nio.charset.StandardCharsets.UTF_8;
@@ -79,30 +77,29 @@ public class TestPublishKafkaMockParameterized {
public static Stream<Arguments> testCaseParametersProvider() {
return Stream.of(
- arguments("PublishRecord/parameterized/flowfileInput1.json",
- "account", ".*A.", getAttributes(), PublishStrategy.USE_VALUE,
- "PublishRecord/parameterized/kafkaOutput1V.json"),
- arguments("PublishRecord/parameterized/flowfileInput1.json",
- "account", ".*B.", getAttributes(), PublishStrategy.USE_WRAPPER,
- "PublishRecord/parameterized/kafkaOutput1W.json"),
- arguments("PublishRecord/parameterized/flowfileInputA.json",
- "key", ".*1", getAttributes(), PublishStrategy.USE_VALUE,
- "PublishRecord/parameterized/kafkaOutputAV.json"),
- arguments("PublishRecord/parameterized/flowfileInputA.json",
- "key", ".*2", getAttributes(), PublishStrategy.USE_WRAPPER,
- "PublishRecord/parameterized/kafkaOutputAW.json")
+ arguments("Publish/parameterized/flowfileInput1.json",
+ "key1A", ".*A.", getAttributes(),
+ "Publish/parameterized/kafkaOutput1A.json"),
+ arguments("Publish/parameterized/flowfileInput1.json",
+ "key1B", ".*B.", getAttributes(),
+ "Publish/parameterized/kafkaOutput1B.json"),
+ arguments("Publish/parameterized/flowfileInputA.json",
+ "keyA1", ".*1", getAttributes(),
+ "Publish/parameterized/kafkaOutputA1.json"),
+ arguments("Publish/parameterized/flowfileInputA.json",
+ "keyA2", ".*2", getAttributes(),
+ "Publish/parameterized/kafkaOutputA2.json")
);
}
@ParameterizedTest
@MethodSource("testCaseParametersProvider")
- public void testPublishKafkaRecord(final String flowfileInputResource,
- final String messageKeyField,
- final String attributeNameRegex,
- final Map<String, String> attributes,
- final PublishStrategy publishStrategy,
- final String kafkaRecordExpectedOutputResource)
- throws IOException, InitializationException {
+ public void testPublishKafka(final String flowfileInputResource,
+ final String messageKey,
+ final String attributeNameRegex,
+ final Map<String, String> attributes,
+ final String kafkaRecordExpectedOutputResource)
+ throws IOException {
final byte[] flowfileData = IOUtils.toByteArray(Objects.requireNonNull(
getClass().getClassLoader().getResource(flowfileInputResource)));
logger.trace(new String(flowfileData, UTF_8));
@@ -114,12 +111,11 @@ public class TestPublishKafkaMockParameterized {
final TestRunner runner = getTestRunner(producedRecords);
runner.setProperty("topic", "test-topic");
runner.setProperty("attribute-name-regex", attributeNameRegex);
- runner.setProperty("message-key-field", messageKeyField);
- runner.setProperty("publish-strategy", publishStrategy.name());
+ runner.setProperty("kafka-key", messageKey);
runner.enqueue(flowFile);
runner.run(1);
// verify results
- runner.assertAllFlowFilesTransferred(PublishKafkaRecord_2_6.REL_SUCCESS, 1);
+ runner.assertAllFlowFilesTransferred(PublishKafka_2_6.REL_SUCCESS, 1);
assertEquals(1, producedRecords.size());
final ProducerRecord<byte[], byte[]> kafkaRecord = producedRecords.iterator().next();
final DefaultPrettyPrinter prettyPrinter = new DefaultPrettyPrinter()
@@ -151,7 +147,7 @@ public class TestPublishKafkaMockParameterized {
public void serialize(final RecordHeader recordHeader, final JsonGenerator jsonGenerator,
final SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeStartObject();
- jsonGenerator.writeObjectField("RecordHeader-key",
+ jsonGenerator.writeStringField("RecordHeader-key",
(recordHeader.key() == null) ? null : recordHeader.key());
jsonGenerator.writeObjectField("RecordHeader-value",
(recordHeader.value() == null) ? null : new String(recordHeader.value(), StandardCharsets.UTF_8));
@@ -174,11 +170,15 @@ public class TestPublishKafkaMockParameterized {
public void serialize(ProducerRecord<byte[], byte[]> producerRecord, JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeStartObject();
- jsonGenerator.writeObjectField("ProducerRecord-key",
- (producerRecord.key() == null) ? null : objectMapper.readTree(producerRecord.key()));
+ jsonGenerator.writeStringField("ProducerRecord-key",
+ (producerRecord.key() == null) ? null : new String(producerRecord.key(), StandardCharsets.UTF_8));
jsonGenerator.writeObjectField("ProducerRecord-value",
(producerRecord.value() == null) ? null : objectMapper.readTree(producerRecord.value()));
- jsonGenerator.writeObjectField("ProducerRecord-headers", producerRecord.headers());
+ final List<Header> headers = new ArrayList<>();
+ producerRecord.headers().forEach(headers::add);
+ final List<Header> headersSorted = headers.stream()
+ .sorted(Comparator.comparing(Header::key)).collect(Collectors.toList());
+ jsonGenerator.writeObjectField("ProducerRecord-headers", headersSorted);
jsonGenerator.writeEndObject();
}
}
@@ -192,15 +192,8 @@ public class TestPublishKafkaMockParameterized {
return attributes;
}
- private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords)
- throws InitializationException {
- final String readerId = "record-reader";
- final RecordReaderFactory readerService = new JsonTreeReader();
- final String writerId = "record-writer";
- final RecordSetWriterFactory writerService = new JsonRecordSetWriter();
- final String keyWriterId = "record-key-writer";
- final RecordSetWriterFactory keyWriterService = new JsonRecordSetWriter();
- final PublishKafkaRecord_2_6 processor = new PublishKafkaRecord_2_6() {
+ private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords) {
+ final PublishKafka_2_6 processor = new PublishKafka_2_6() {
@Override
protected PublisherPool createPublisherPool(final ProcessContext context) {
return getPublisherPool(producedRecords, context);
@@ -208,15 +201,6 @@ public class TestPublishKafkaMockParameterized {
};
final TestRunner runner = TestRunners.newTestRunner(processor);
runner.setValidateExpressionUsage(false);
- runner.addControllerService(readerId, readerService);
- runner.enableControllerService(readerService);
- runner.setProperty(readerId, readerId);
- runner.addControllerService(writerId, writerService);
- runner.enableControllerService(writerService);
- runner.setProperty(writerId, writerId);
- runner.addControllerService(keyWriterId, keyWriterService);
- runner.enableControllerService(keyWriterService);
- runner.setProperty(keyWriterId, keyWriterId);
return runner;
}
@@ -229,10 +213,8 @@ public class TestPublishKafkaMockParameterized {
final boolean useTransactions = context.getProperty("use-transactions").asBoolean();
final String transactionalIdPrefix = context.getProperty("transactional-id-prefix").evaluateAttributeExpressions().getValue();
Supplier<String> transactionalIdSupplier = new TransactionIdSupplier(transactionalIdPrefix);
- final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
final String charsetName = context.getProperty("message-header-encoding").evaluateAttributeExpressions().getValue();
final Charset charset = Charset.forName(charsetName);
- final RecordSetWriterFactory recordKeyWriterFactory = context.getProperty("record-key-writer").asControllerService(RecordSetWriterFactory.class);
return new PublisherPool(
Collections.emptyMap(),
@@ -243,8 +225,8 @@ public class TestPublishKafkaMockParameterized {
transactionalIdSupplier,
attributeNamePattern,
charset,
- publishStrategy,
- recordKeyWriterFactory) {
+ null,
+ null) {
@Override
public PublisherLease obtainPublisher() {
return getPublisherLease(producedRecords, context);
@@ -259,9 +241,6 @@ public class TestPublishKafkaMockParameterized {
final ProcessContext context) {
final String attributeNameRegex = context.getProperty("attribute-name-regex").getValue();
final Pattern patternAttributeName = (attributeNameRegex == null) ? null : Pattern.compile(attributeNameRegex);
- final RecordSetWriterFactory keyWriterFactory = context.getProperty("record-key-writer")
- .asControllerService(RecordSetWriterFactory.class);
- final PublishStrategy publishStrategy = PublishStrategy.valueOf(context.getProperty("publish-strategy").getValue());
final Producer<byte[], byte[]> producer = mock(ProducerBB.class);
when(producer.send(any(), any())).then(invocation -> {
@@ -280,8 +259,8 @@ public class TestPublishKafkaMockParameterized {
true,
patternAttributeName,
UTF_8,
- publishStrategy,
- keyWriterFactory) {
+ null,
+ null) {
@Override
protected long getTimestamp() {
return 1000000000000L;
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecordMockParameterized.java
similarity index 86%
copy from nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java
copy to nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecordMockParameterized.java
index 4e97a6da84..f5785ac803 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaMockParameterized.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecordMockParameterized.java
@@ -17,6 +17,7 @@
package org.apache.nifi.processors.kafka.pubsub;
import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.util.DefaultIndenter;
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
import com.fasterxml.jackson.databind.JsonSerializer;
@@ -72,7 +73,7 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-public class TestPublishKafkaMockParameterized {
+public class TestPublishKafkaRecordMockParameterized {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final ObjectMapper mapper = getObjectMapper();
@@ -90,7 +91,17 @@ public class TestPublishKafkaMockParameterized {
"PublishRecord/parameterized/kafkaOutputAV.json"),
arguments("PublishRecord/parameterized/flowfileInputA.json",
"key", ".*2", getAttributes(), PublishStrategy.USE_WRAPPER,
- "PublishRecord/parameterized/kafkaOutputAW.json")
+ "PublishRecord/parameterized/kafkaOutputAW.json"),
+
+ arguments("PublishRecord/parameterized/flowfileInputDoc1V.json",
+ "account", "attribute.*", getAttributesDoc1(), PublishStrategy.USE_VALUE,
+ "PublishRecord/parameterized/kafkaOutputDoc1V.json"),
+ arguments("PublishRecord/parameterized/flowfileInputDoc1W.json",
+ null, null, Collections.emptyMap(), PublishStrategy.USE_WRAPPER,
+ "PublishRecord/parameterized/kafkaOutputDoc1W.json"),
+ arguments("PublishRecord/parameterized/flowfileInputDoc2W.json",
+ null, null, Collections.emptyMap(), PublishStrategy.USE_WRAPPER,
+ "PublishRecord/parameterized/kafkaOutputDoc2W.json")
);
}
@@ -113,8 +124,12 @@ public class TestPublishKafkaMockParameterized {
final Collection<ProducerRecord<byte[], byte[]>> producedRecords = new ArrayList<>();
final TestRunner runner = getTestRunner(producedRecords);
runner.setProperty("topic", "test-topic");
- runner.setProperty("attribute-name-regex", attributeNameRegex);
- runner.setProperty("message-key-field", messageKeyField);
+ if (attributeNameRegex != null) {
+ runner.setProperty("attribute-name-regex", attributeNameRegex);
+ }
+ if (messageKeyField != null) {
+ runner.setProperty("message-key-field", messageKeyField);
+ }
runner.setProperty("publish-strategy", publishStrategy.name());
runner.enqueue(flowFile);
runner.run(1);
@@ -174,13 +189,23 @@ public class TestPublishKafkaMockParameterized {
public void serialize(ProducerRecord<byte[], byte[]> producerRecord, JsonGenerator jsonGenerator,
SerializerProvider serializerProvider) throws IOException {
jsonGenerator.writeStartObject();
- jsonGenerator.writeObjectField("ProducerRecord-key",
- (producerRecord.key() == null) ? null : objectMapper.readTree(producerRecord.key()));
- jsonGenerator.writeObjectField("ProducerRecord-value",
- (producerRecord.value() == null) ? null : objectMapper.readTree(producerRecord.value()));
+ serializeField(jsonGenerator, "ProducerRecord-key", producerRecord.key());
+ serializeField(jsonGenerator, "ProducerRecord-value", producerRecord.value());
jsonGenerator.writeObjectField("ProducerRecord-headers", producerRecord.headers());
jsonGenerator.writeEndObject();
}
+
+ private void serializeField(final JsonGenerator jsonGenerator, final String key, final byte[] value) throws IOException {
+ if (value == null) {
+ jsonGenerator.writeObjectField(key, null);
+ } else {
+ try {
+ jsonGenerator.writeObjectField(key, objectMapper.readTree(value));
+ } catch (final JsonParseException e) {
+ jsonGenerator.writeStringField(key, new String(value, UTF_8));
+ }
+ }
+ }
}
private static Map<String, String> getAttributes() {
@@ -192,6 +217,14 @@ public class TestPublishKafkaMockParameterized {
return attributes;
}
+ private static Map<String, String> getAttributesDoc1() {
+ final Map<String, String> attributes = new TreeMap<>();
+ attributes.put("attributeA", "valueA");
+ attributes.put("attributeB", "valueB");
+ attributes.put("otherAttribute", "otherValue");
+ return attributes;
+ }
+
private TestRunner getTestRunner(final Collection<ProducerRecord<byte[], byte[]>> producedRecords)
throws InitializationException {
final String readerId = "record-reader";
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInput1.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInput1.json
new file mode 100644
index 0000000000..7764158297
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInput1.json
@@ -0,0 +1,8 @@
+{
+ "address": "1234 First Street",
+ "zip": "12345",
+ "account": {
+ "name": "Acme",
+ "number": "AC1234"
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInputA.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInputA.json
new file mode 100644
index 0000000000..ee19739c31
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/flowfileInputA.json
@@ -0,0 +1,12 @@
+{
+ "key": {
+ "type": "person"
+ },
+ "value": {
+ "name": "Mark",
+ "number": 49
+ },
+ "headers": {
+ "headerA": "headerAValue"
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1A.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1A.json
new file mode 100644
index 0000000000..1fb83cc47c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1A.json
@@ -0,0 +1,18 @@
+{
+ "ProducerRecord-key" : "key1A",
+ "ProducerRecord-value" : {
+ "address" : "1234 First Street",
+ "zip" : "12345",
+ "account" : {
+ "name" : "Acme",
+ "number" : "AC1234"
+ }
+ },
+ "ProducerRecord-headers" : [ {
+ "RecordHeader-key" : "attrKeyA1",
+ "RecordHeader-value" : "attrValueA1"
+ }, {
+ "RecordHeader-key" : "attrKeyA2",
+ "RecordHeader-value" : "attrValueA2"
+ } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1B.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1B.json
new file mode 100644
index 0000000000..49e9804e43
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutput1B.json
@@ -0,0 +1,18 @@
+{
+ "ProducerRecord-key" : "key1B",
+ "ProducerRecord-value" : {
+ "address" : "1234 First Street",
+ "zip" : "12345",
+ "account" : {
+ "name" : "Acme",
+ "number" : "AC1234"
+ }
+ },
+ "ProducerRecord-headers" : [ {
+ "RecordHeader-key" : "attrKeyB1",
+ "RecordHeader-value" : "attrValueB1"
+ }, {
+ "RecordHeader-key" : "attrKeyB2",
+ "RecordHeader-value" : "attrValueB2"
+ } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA1.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA1.json
new file mode 100644
index 0000000000..baa3a1a927
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA1.json
@@ -0,0 +1,22 @@
+{
+ "ProducerRecord-key" : "keyA1",
+ "ProducerRecord-value" : {
+ "key" : {
+ "type" : "person"
+ },
+ "value" : {
+ "name" : "Mark",
+ "number" : 49
+ },
+ "headers" : {
+ "headerA" : "headerAValue"
+ }
+ },
+ "ProducerRecord-headers" : [ {
+ "RecordHeader-key" : "attrKeyA1",
+ "RecordHeader-value" : "attrValueA1"
+ }, {
+ "RecordHeader-key" : "attrKeyB1",
+ "RecordHeader-value" : "attrValueB1"
+ } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA2.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA2.json
new file mode 100644
index 0000000000..ba620f70b7
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/Publish/parameterized/kafkaOutputA2.json
@@ -0,0 +1,22 @@
+{
+ "ProducerRecord-key" : "keyA2",
+ "ProducerRecord-value" : {
+ "key" : {
+ "type" : "person"
+ },
+ "value" : {
+ "name" : "Mark",
+ "number" : 49
+ },
+ "headers" : {
+ "headerA" : "headerAValue"
+ }
+ },
+ "ProducerRecord-headers" : [ {
+ "RecordHeader-key" : "attrKeyA2",
+ "RecordHeader-value" : "attrValueA2"
+ }, {
+ "RecordHeader-key" : "attrKeyB2",
+ "RecordHeader-value" : "attrValueB2"
+ } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1V.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1V.json
new file mode 100644
index 0000000000..7764158297
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1V.json
@@ -0,0 +1,8 @@
+{
+ "address": "1234 First Street",
+ "zip": "12345",
+ "account": {
+ "name": "Acme",
+ "number": "AC1234"
+ }
+}
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1W.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1W.json
new file mode 100644
index 0000000000..72db3c15ff
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc1W.json
@@ -0,0 +1,15 @@
+{
+ "key": "Acme Holdings",
+ "value": {
+ "address": "1234 First Street",
+ "zip": "12345",
+ "account": {
+ "name": "Acme",
+ "number":"AC1234"
+ }
+ },
+ "headers": {
+ "accountType": "enterprise",
+ "test": "true"
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc2W.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc2W.json
new file mode 100644
index 0000000000..1a2087851b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/flowfileInputDoc2W.json
@@ -0,0 +1,15 @@
+{
+ "key": {
+ "accountName": "Acme Holdings",
+ "accountHolder": "John Doe",
+ "accountId": "280182830-A009"
+ },
+ "value": {
+ "address": "1234 First Street",
+ "zip": "12345",
+ "account": {
+ "name": "Acme",
+ "number":"AC1234"
+ }
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1V.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1V.json
new file mode 100644
index 0000000000..146b5fdb00
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1V.json
@@ -0,0 +1,21 @@
+{
+ "ProducerRecord-key" : {
+ "name" : "Acme",
+ "number" : "AC1234"
+ },
+ "ProducerRecord-value" : {
+ "address" : "1234 First Street",
+ "zip" : "12345",
+ "account" : {
+ "name" : "Acme",
+ "number" : "AC1234"
+ }
+ },
+ "ProducerRecord-headers" : [ {
+ "RecordHeader-key" : "attributeA",
+ "RecordHeader-value" : "valueA"
+ }, {
+ "RecordHeader-key" : "attributeB",
+ "RecordHeader-value" : "valueB"
+ } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1W.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1W.json
new file mode 100644
index 0000000000..f774579d56
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc1W.json
@@ -0,0 +1,18 @@
+{
+ "ProducerRecord-key" : "Acme Holdings",
+ "ProducerRecord-value" : {
+ "address" : "1234 First Street",
+ "zip" : "12345",
+ "account" : {
+ "name" : "Acme",
+ "number" : "AC1234"
+ }
+ },
+ "ProducerRecord-headers" : [ {
+ "RecordHeader-key" : "accountType",
+ "RecordHeader-value" : "enterprise"
+ }, {
+ "RecordHeader-key" : "test",
+ "RecordHeader-value" : "true"
+ } ]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc2W.json b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc2W.json
new file mode 100644
index 0000000000..a38fb4e6c1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/test/resources/PublishRecord/parameterized/kafkaOutputDoc2W.json
@@ -0,0 +1,16 @@
+{
+ "ProducerRecord-key" : {
+ "accountName" : "Acme Holdings",
+ "accountHolder" : "John Doe",
+ "accountId" : "280182830-A009"
+ },
+ "ProducerRecord-value" : {
+ "address" : "1234 First Street",
+ "zip" : "12345",
+ "account" : {
+ "name" : "Acme",
+ "number" : "AC1234"
+ }
+ },
+ "ProducerRecord-headers" : [ ]
+}
\ No newline at end of file