You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/05/18 16:15:42 UTC

[GitHub] [nifi] markap14 commented on a diff in pull request #6045: NIFI-9822 - ConsumeKafkaRecord allows writing out Kafka record key

markap14 commented on code in PR #6045:
URL: https://github.com/apache/nifi/pull/6045#discussion_r874777463


##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {

Review Comment:
   Took me a minute to figure out what this mean - `KafkaProcessorUtils.RECORD` wasn't immediately obvious to me. Perhaps it makes sense to rename `RECORD`, `STRING`, etc. to something that makes more sense outside the context, such as `KEY_AS_RECORD`, `KEY_AS_STRING` etc.? Is a bit of a nitpick and you can feel free to ignore if you want.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##########
@@ -426,11 +465,13 @@ protected ConsumerPool createConsumerPool(final ProcessContext context, final Co
             }
 
             return new ConsumerPool(maxLeases, readerFactory, writerFactory, props, topics, maxUncommittedTime, securityProtocol,

Review Comment:
   Doesn't necessarily need to be done in this ticket. But probably makes sense to introduce a Builder pattern here instead of so many constructor args. It made sense before this, too, though :)



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = keyReaderFactory.createRecordReader(attributes, is, key.length, logger);
+            final Record record = reader.nextRecord();
+            final RecordField recordField = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+            tuple = new Tuple<>(recordField, record);
+        } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) {
+            final RecordField recordField = new RecordField("key", RecordFieldType.STRING.getDataType());
+            tuple = new Tuple<>(recordField, new String(key, StandardCharsets.UTF_8));

Review Comment:
   We'll definitely want to make sure that we document that this strategy requires that the key be a UTF-8 compatible String. And we should probably ensure that we test with a non-UTF-8 compatible String. In that case, the record should probably go to the parse.failure relationship.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = keyReaderFactory.createRecordReader(attributes, is, key.length, logger);
+            final Record record = reader.nextRecord();
+            final RecordField recordField = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+            tuple = new Tuple<>(recordField, record);
+        } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) {
+            final RecordField recordField = new RecordField("key", RecordFieldType.STRING.getDataType());
+            tuple = new Tuple<>(recordField, new String(key, StandardCharsets.UTF_8));
+        } else {
+            final RecordField recordField = new RecordField("key",
+                    RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
+            tuple = new Tuple<>(recordField, key);

Review Comment:
   Eventually we need to introduce a BYTES data type for Records. Right now, when we have an Array of type Byte, the record api expects this to be an array of `Byte` objects, not primitive bytes. So, as inefficient as it is, in this case, I think we need to create a `Byte[]` for the key instead of provide the `byte[]`.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java:
##########
@@ -193,10 +197,21 @@ void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSet
                     additionalAttributes = writeResult.getAttributes();
                     writer.flush();
                 }
-
                 final byte[] messageContent = baos.toByteArray();
-                final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
-                final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
+
+                final byte[] messageKey;
+                if (recordKeyWriterFactory == null) {
+                    messageKey = Optional.ofNullable(record.getAsString(messageKeyField))
+                            .map(s -> s.getBytes(StandardCharsets.UTF_8)).orElse(null);
+                } else {
+                    final ByteArrayOutputStream os = new ByteArrayOutputStream(1024);

Review Comment:
   Should add the BAOS to the try-with-resources below.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##########
@@ -255,6 +255,36 @@ <h3>SASL_SSL</h3>
             See the SSL section for a description of how to configure the SSL Context Service based on the
             ssl.client.auth property.
         </p>
+<h2>Output Modes</h2>
+<div>
+<p>This processor (NiFi 1.17+) offers multiple output strategies (configured via processor property "Output Strategy")
+for converting Kafka records into flow files.
+- Output Strategy "Write Value Only" (the default) emits flowfile records containing only the Kafka record value.
+- Output Strategy "Use Wrapper" (new) emits flowfile records containing the Kafka record key, value, and headers, as
+well as additional metadata from the Kafka record.</p>
+
+<p>If the Output Strategy property "Use Wrapper" is active, an additional processor configuration property is activated,
+to fine-tune the transformation of the incoming Kafka record.
+- The format of the Kafka record key may be interpreted via property "Key Format" as "Byte Array", "String", or
+"Record".
+-- "Byte Array" supplies the Kafka Record Key bytes unchanged from the incoming Kafka record.
+-- "String" converts the Kafka Record Key bytes into a string using the UTF-8 character encoding.
+-- "Record" converts the Kafka Record Key bytes into a deserialized NiFi record, using the associated "Key Record
+Reader" controller service.</p>
+
+<p>If the Key Format property is set to "Record", an additional processor configuration property is activated.
+- "Key Record Reader" is used to specify the controller service that is used to deserialize the key bytes.  It may be
+set to any available implementation of the NiFi "RecordReaderFactory" interface.</p>
+
+<p>These new processor properties may be used to extend the capabilities of ConsumeKafkaRecord_2_6, by optionally
+incorporating additional information from the Kafka record (key, headers, metadata) into the outbound flowfile.  And
+the Kafka key data may now be interpreted as a record, rather than as a string, enabling additional decision-making by
+downstream processors in your flow.</p>
+
+<p>Additionally, the choice of the "Output Strategy" processor property affects the related properties "Headers to Add
+as Attributes (Regex)" and "Key Attribute Encoding".  These properties are available only when "Output Strategy" is set
+to "Write Value Only".</p>

Review Comment:
   Might make sense to mention the reason they are only available when Output Strategy = Write Value Only. I.e., because it doesn't make sense when writing Records, as the Headers and keys are not attributes, they are part of the Record/FlowFile content.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = keyReaderFactory.createRecordReader(attributes, is, key.length, logger);
+            final Record record = reader.nextRecord();
+            final RecordField recordField = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+            tuple = new Tuple<>(recordField, record);
+        } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) {
+            final RecordField recordField = new RecordField("key", RecordFieldType.STRING.getDataType());
+            tuple = new Tuple<>(recordField, new String(key, StandardCharsets.UTF_8));
+        } else {
+            final RecordField recordField = new RecordField("key",
+                    RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
+            tuple = new Tuple<>(recordField, key);
+        }
+        return tuple;
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordValue(final Record record) {
+        final RecordField recordField = new RecordField(
+                "value", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+        return new Tuple<>(recordField, record);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordHeaders(final ConsumerRecord<byte[], byte[]> consumerRecord) {
+        final RecordField recordField = new RecordField(
+                "headers", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        final Map<String, String> headers = new HashMap<>();
+        Arrays.stream(consumerRecord.headers().toArray()).forEach(
+                h -> headers.put(h.key(), new String(h.value(), StandardCharsets.UTF_8)));

Review Comment:
   Also looks like we have a member variable already for `headerCharacterSet` - probably makes sense to use that, rather than `StandardCharsets.UTF_8`



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = keyReaderFactory.createRecordReader(attributes, is, key.length, logger);
+            final Record record = reader.nextRecord();
+            final RecordField recordField = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+            tuple = new Tuple<>(recordField, record);
+        } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) {
+            final RecordField recordField = new RecordField("key", RecordFieldType.STRING.getDataType());
+            tuple = new Tuple<>(recordField, new String(key, StandardCharsets.UTF_8));
+        } else {
+            final RecordField recordField = new RecordField("key",
+                    RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
+            tuple = new Tuple<>(recordField, key);
+        }
+        return tuple;
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordValue(final Record record) {
+        final RecordField recordField = new RecordField(
+                "value", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+        return new Tuple<>(recordField, record);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordHeaders(final ConsumerRecord<byte[], byte[]> consumerRecord) {
+        final RecordField recordField = new RecordField(
+                "headers", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        final Map<String, String> headers = new HashMap<>();
+        Arrays.stream(consumerRecord.headers().toArray()).forEach(
+                h -> headers.put(h.key(), new String(h.value(), StandardCharsets.UTF_8)));

Review Comment:
   This is kind of expensive. The call to `toArray()` has to create a new array to hold the objects. Then we call `Arrays.stream()` and the creation of a Stream is quite expensive. And we do this just in order to iterate over the elements. But `Headers` extends `Iterable<Header>` so we could instead just use something like:
   ```
   for (final Header header : consumerRecord.headers()) {
     headers.put(header.key(), new String(header.value(), StandardCharsets.UTF_8));
   }
   ```



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = keyReaderFactory.createRecordReader(attributes, is, key.length, logger);
+            final Record record = reader.nextRecord();
+            final RecordField recordField = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+            tuple = new Tuple<>(recordField, record);
+        } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) {
+            final RecordField recordField = new RecordField("key", RecordFieldType.STRING.getDataType());
+            tuple = new Tuple<>(recordField, new String(key, StandardCharsets.UTF_8));
+        } else {
+            final RecordField recordField = new RecordField("key",
+                    RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
+            tuple = new Tuple<>(recordField, key);
+        }
+        return tuple;
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordValue(final Record record) {
+        final RecordField recordField = new RecordField(
+                "value", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+        return new Tuple<>(recordField, record);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordHeaders(final ConsumerRecord<byte[], byte[]> consumerRecord) {
+        final RecordField recordField = new RecordField(
+                "headers", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        final Map<String, String> headers = new HashMap<>();
+        Arrays.stream(consumerRecord.headers().toArray()).forEach(
+                h -> headers.put(h.key(), new String(h.value(), StandardCharsets.UTF_8)));
+        return new Tuple<>(recordField, headers);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordMetadata(final ConsumerRecord<byte[], byte[]> consumerRecord) {
+        final RecordField recordField = new RecordField(
+                "metadata", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        final Map<String, String> metadata = new HashMap<>();
+        metadata.put("topic", consumerRecord.topic());
+        metadata.put("partition", Integer.toString(consumerRecord.partition()));
+        metadata.put("offset", Long.toString(consumerRecord.offset()));
+        metadata.put("timestamp", Long.toString(consumerRecord.timestamp()));
+        return new Tuple<>(recordField, metadata);

Review Comment:
   Given that the keys of this map are well known, and 3 of the fields are numeric, perhaps rather than a Map here, we should use a Record. Then we can use proper numeric types, even a TIMESTAMP type for the `timestamp` field?



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##########
@@ -255,6 +255,36 @@ <h3>SASL_SSL</h3>
             See the SSL section for a description of how to configure the SSL Context Service based on the
             ssl.client.auth property.
         </p>
+<h2>Output Modes</h2>
+<div>
+<p>This processor (NiFi 1.17+) offers multiple output strategies (configured via processor property "Output Strategy")
+for converting Kafka records into flow files.
+- Output Strategy "Write Value Only" (the default) emits flowfile records containing only the Kafka record value.
+- Output Strategy "Use Wrapper" (new) emits flowfile records containing the Kafka record key, value, and headers, as
+well as additional metadata from the Kafka record.</p>

Review Comment:
   Should probably use `<ul>` with `<li>` rather than `-` for denoting lists.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = keyReaderFactory.createRecordReader(attributes, is, key.length, logger);

Review Comment:
   Should use try-with-resources here to ensure that we close the InputStream and the Record Reader.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_2_6.java:
##########
@@ -282,10 +314,13 @@ public class ConsumeKafkaRecord_2_6 extends AbstractProcessor implements Verifia
         descriptors.add(KafkaProcessorUtils.TOKEN_AUTH);
         descriptors.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
         descriptors.add(SEPARATE_BY_KEY);
+        descriptors.add(OUTPUT_STRATEGY);

Review Comment:
   Output Strategy (and related properties) is going to be a very important thing for the user to think through when configuring this. Because of that, I'd recommend moving this property up in the list to just after Group ID



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_2_6/additionalDetails.html:
##########
@@ -255,6 +255,36 @@ <h3>SASL_SSL</h3>
             See the SSL section for a description of how to configure the SSL Context Service based on the
             ssl.client.auth property.
         </p>
+<h2>Output Modes</h2>
+<div>
+<p>This processor (NiFi 1.17+) offers multiple output strategies (configured via processor property "Output Strategy")
+for converting Kafka records into flow files.
+- Output Strategy "Write Value Only" (the default) emits flowfile records containing only the Kafka record value.
+- Output Strategy "Use Wrapper" (new) emits flowfile records containing the Kafka record key, value, and headers, as

Review Comment:
   We need to be sure that we call out the Record Schema that will be used here.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java:
##########
@@ -653,6 +671,71 @@ private void writeRecordData(final ProcessSession session, final List<ConsumerRe
         }
     }
 
+    private MapRecord toWrapperRecord(final ConsumerRecord<byte[], byte[]> consumerRecord, final Record record)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tupleKey  = toWrapperRecordKey(consumerRecord);
+        final Tuple<RecordField, Object> tupleValue  = toWrapperRecordValue(record);
+        final Tuple<RecordField, Object> tupleHeaders  = toWrapperRecordHeaders(consumerRecord);
+        final Tuple<RecordField, Object> tupleMetadata = toWrapperRecordMetadata(consumerRecord);
+        final RecordSchema rootRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                tupleKey.getKey(), tupleValue.getKey(), tupleHeaders.getKey(), tupleMetadata.getKey()));
+
+        final Map<String, Object> recordValues = new HashMap<>();
+        recordValues.put(tupleKey.getKey().getFieldName(), tupleKey.getValue());
+        recordValues.put(tupleValue.getKey().getFieldName(), tupleValue.getValue());
+        recordValues.put(tupleHeaders.getKey().getFieldName(), tupleHeaders.getValue());
+        recordValues.put(tupleMetadata.getKey().getFieldName(), tupleMetadata.getValue());
+        return new MapRecord(rootRecordSchema, recordValues);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordKey(final ConsumerRecord<byte[], byte[]> consumerRecord)
+            throws IOException, SchemaNotFoundException, MalformedRecordException {
+        final Tuple<RecordField, Object> tuple;
+        final byte[] key = consumerRecord.key() == null ? new byte[0] : consumerRecord.key();
+        if (KafkaProcessorUtils.RECORD.getValue().equals(keyFormat)) {
+            final Map<String, String> attributes = getAttributes(consumerRecord);
+            final InputStream is = new ByteArrayInputStream(key);
+            final RecordReader reader = keyReaderFactory.createRecordReader(attributes, is, key.length, logger);
+            final Record record = reader.nextRecord();
+            final RecordField recordField = new RecordField("key", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+            tuple = new Tuple<>(recordField, record);
+        } else if (KafkaProcessorUtils.STRING.getValue().equals(keyFormat)) {
+            final RecordField recordField = new RecordField("key", RecordFieldType.STRING.getDataType());
+            tuple = new Tuple<>(recordField, new String(key, StandardCharsets.UTF_8));
+        } else {
+            final RecordField recordField = new RecordField("key",
+                    RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()));
+            tuple = new Tuple<>(recordField, key);
+        }
+        return tuple;
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordValue(final Record record) {
+        final RecordField recordField = new RecordField(
+                "value", RecordFieldType.RECORD.getRecordDataType(record.getSchema()));
+        return new Tuple<>(recordField, record);
+    }
+
+    private Tuple<RecordField, Object> toWrapperRecordHeaders(final ConsumerRecord<byte[], byte[]> consumerRecord) {
+        final RecordField recordField = new RecordField(
+                "headers", RecordFieldType.MAP.getMapDataType(RecordFieldType.STRING.getDataType()));
+        final Map<String, String> headers = new HashMap<>();
+        Arrays.stream(consumerRecord.headers().toArray()).forEach(
+                h -> headers.put(h.key(), new String(h.value(), StandardCharsets.UTF_8)));

Review Comment:
   We'll also want to be sure in our documentation when it's written, that we mention that headers must be UTF-8 compatible. But unlike the Key, I think it makes sense, perhaps, to just log a warning and continue on if a Header cannot be parsed as a UTF-8 String.



##########
nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-2-6-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java:
##########
@@ -193,10 +197,21 @@ void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSet
                     additionalAttributes = writeResult.getAttributes();
                     writer.flush();
                 }
-
                 final byte[] messageContent = baos.toByteArray();
-                final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
-                final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
+
+                final byte[] messageKey;
+                if (recordKeyWriterFactory == null) {
+                    messageKey = Optional.ofNullable(record.getAsString(messageKeyField))
+                            .map(s -> s.getBytes(StandardCharsets.UTF_8)).orElse(null);
+                } else {
+                    final ByteArrayOutputStream os = new ByteArrayOutputStream(1024);
+                    final MapRecord keyRecord = (MapRecord) record.getValue(messageKeyField);

Review Comment:
   Should probably be using `Record` here - not `MapRecord`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org