You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/22 19:03:42 UTC
nifi git commit: NIFI-3953: This closes #1837. Allow multiple schemas
on same kafka topic/partition for ConsumeKafkaRecord_0_10 Also,
updated record writers to ensure that they write the schema as appropriate if
not using a RecordSet. Updated ConsumeKafk
Repository: nifi
Updated Branches:
refs/heads/master 6d16fdf17 -> 6937a6cf6
NIFI-3953: This closes #1837. Allow multiple schemas on same kafka topic/partition for ConsumeKafkaRecord_0_10
Also, updated record writers to ensure that they write the schema as appropriate if not using a RecordSet. Updated ConsumeKafkaRecord to allow for multiple schemas to be on same topic and partition
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6937a6cf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6937a6cf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6937a6cf
Branch: refs/heads/master
Commit: 6937a6cf64c2f9e437b96550259575488eb284ec
Parents: 6d16fdf
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon May 22 12:02:18 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Mon May 22 14:37:53 2017 -0400
----------------------------------------------------------------------
.../serialization/AbstractRecordSetWriter.java | 4 +-
.../processors/kafka/pubsub/ConsumerLease.java | 179 ++++++++++---------
.../processors/kafka/pubsub/PublisherLease.java | 1 +
.../avro/WriteAvroResultWithExternalSchema.java | 8 +-
.../org/apache/nifi/csv/WriteCSVResult.java | 6 +
.../org/apache/nifi/json/WriteJsonResult.java | 7 +
6 files changed, 120 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
index 6ce9138..4de5ce3 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
@@ -68,7 +68,7 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter {
return recordCount;
}
- protected final boolean isRecordSetActive() {
+ protected final boolean isActiveRecordSet() {
return activeRecordSet;
}
@@ -84,7 +84,7 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter {
@Override
public final WriteResult finishRecordSet() throws IOException {
- if (!isRecordSetActive()) {
+ if (!isActiveRecordSet()) {
throw new IllegalStateException("Cannot finish RecordSet because no RecordSet has begun");
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 563ece6..effd2e4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -21,22 +21,18 @@ import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.RE
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
-import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
-import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import javax.xml.bind.DatatypeConverter;
@@ -57,11 +53,10 @@ import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.Tuple;
/**
* This class represents a lease to access a Kafka Consumer object. The lease is
@@ -411,94 +406,116 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
return;
}
- FlowFile flowFile = session.create();
- try {
- final RecordSchema schema;
+ final Map<RecordSchema, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
- try {
- schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
- } catch (final Exception e) {
- logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
+ // In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
+ // We don't want to create a new FlowFile for each record that we receive, so we will just create
+ // a "temporary flowfile" that will be removed in the finally block below and use that to pass to
+ // the createRecordReader method.
+ final FlowFile tempFlowFile = session.create();
+ try {
+ final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
+ while (itr.hasNext()) {
+ final ConsumerRecord<byte[], byte[]> consumerRecord = itr.next();
+ final InputStream in = new ByteArrayInputStream(consumerRecord.value());
+ final Record record;
try {
- rollback(topicPartition);
- } catch (final Exception rollbackException) {
- logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
- }
+ final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger);
+ record = reader.nextRecord();
+ } catch (final Exception e) {
+ // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+ attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
+ attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
- yield();
- throw new ProcessException(e);
- }
+ FlowFile failureFlowFile = session.create();
+ failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
+ failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
- final FlowFile ff = flowFile;
- final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
- final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
+ final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
+ session.getProvenanceReporter().receive(failureFlowFile, transitUri);
- flowFile = session.write(flowFile, rawOut -> {
- final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
+ session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+ logger.error("Failed to parse message from Kafka using the configured Record Reader. "
+ + "Will route message as its own FlowFile to the 'parse.failure' relationship", e);
- final RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList());
- final RecordSet recordSet = new RecordSet() {
- @Override
- public RecordSchema getSchema() throws IOException {
- return emptySchema;
- }
+ session.adjustCounter("Parse Failures", 1, false);
+ continue;
+ }
+
+ final RecordSchema recordSchema = record.getSchema();
- @Override
- public Record next() throws IOException {
- while (itr.hasNext()) {
- final ConsumerRecord<byte[], byte[]> consumerRecord = itr.next();
-
- final InputStream in = new ByteArrayInputStream(consumerRecord.value());
- try {
- final RecordReader reader = readerFactory.createRecordReader(ff, in, logger);
- final Record record = reader.nextRecord();
- return record;
- } catch (final Exception e) {
- final Map<String, String> attributes = new HashMap<>();
- attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
- attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
- attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
-
- FlowFile failureFlowFile = session.create();
- failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
- failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
-
- session.transfer(failureFlowFile, REL_PARSE_FAILURE);
- logger.error("Failed to parse message from Kafka using the configured Record Reader. "
- + "Will route message as its own FlowFile to the 'parse.failure' relationship", e);
-
- session.adjustCounter("Parse Failures", 1, false);
- }
+ Tuple<FlowFile, RecordSetWriter> tuple = writers.get(recordSchema);
+ if (tuple == null) {
+ FlowFile flowFile = session.create();
+ final OutputStream rawOut = session.write(flowFile);
+
+ final RecordSchema writeSchema;
+ try {
+ writeSchema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
+ } catch (final Exception e) {
+ logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
+
+ try {
+ rollback(topicPartition);
+ } catch (final Exception rollbackException) {
+ logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
- return null;
+ yield();
+ throw new ProcessException(e);
}
- };
- try (final OutputStream out = new BufferedOutputStream(rawOut);
- final RecordSetWriter writer = writerFactory.createWriter(logger, schema, ff, out)) {
- writeResult.set(writer.write(recordSet));
- mimeTypeRef.set(writer.getMimeType());
- } catch (final Exception e) {
- logger.error("Failed to write records to FlowFile. Will roll back the Kafka message offsets.", e);
+ final RecordSetWriter writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut);
+ writer.beginRecordSet();
- try {
- rollback(topicPartition);
- } catch (final Exception rollbackException) {
- logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
- }
+ tuple = new Tuple<>(flowFile, writer);
+ writers.put(recordSchema, tuple);
+ }
+
+ final RecordSetWriter writer = tuple.getValue();
+ writer.write(record);
+ }
+ } catch (final Exception e) {
+ logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);
+
+ try {
+ rollback(topicPartition);
+ } catch (final Exception rollbackException) {
+ logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
+ }
+
+ throw new ProcessException(e);
+ } finally {
+ session.remove(tempFlowFile);
+ }
+
+ for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
+ FlowFile flowFile = tuple.getKey();
+ final RecordSetWriter writer = tuple.getValue();
- yield();
- throw new ProcessException(e);
+ final WriteResult writeResult;
+ try {
+ writeResult = writer.finishRecordSet();
+ writer.close();
+ } catch (final Exception e) {
+ logger.error("Failed to finish writing records to Content Repository", e);
+ try {
+ rollback(topicPartition);
+ } catch (final Exception rollbackException) {
+ logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
}
- });
+ throw new ProcessException(e);
+ }
- final WriteResult result = writeResult.get();
- if (result.getRecordCount() > 0) {
- final Map<String, String> attributes = new HashMap<>(result.getAttributes());
- attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
- attributes.put("record.count", String.valueOf(result.getRecordCount()));
+ final int recordCount = writeResult.getRecordCount();
+ if (recordCount > 0) {
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.putAll(writeResult.getAttributes());
+ attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+ attributes.put("record.count", String.valueOf(recordCount));
attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
@@ -509,17 +526,15 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
- session.adjustCounter("Records Received", result.getRecordCount(), false);
+ session.adjustCounter("Records Received", recordCount, false);
session.transfer(flowFile, REL_SUCCESS);
} else {
session.remove(flowFile);
}
- } catch (final Exception e) {
- session.remove(flowFile);
- throw e;
}
}
+
private void populateAttributes(final BundleTracker tracker) {
final Map<String, String> kafkaAttrs = new HashMap<>();
kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 66641df..4238956 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -112,6 +112,7 @@ public class PublisherLease implements Closeable {
while ((record = recordSet.next()) != null) {
recordCount++;
baos.reset();
+
writer.write(record);
writer.flush();
http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
index 25d494e..8464e45 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
@@ -50,7 +50,6 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
this.buffered = new BufferedOutputStream(out);
datumWriter = new GenericDatumWriter<>(avroSchema);
- schemaAccessWriter.writeHeader(recordSchema, buffered);
encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, null);
}
@@ -67,6 +66,13 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
@Override
public Map<String, String> writeRecord(final Record record) throws IOException {
+ // If we are not writing an active record set, then we need to ensure that we write the
+ // schema information.
+ if (!isActiveRecordSet()) {
+ flush();
+ schemaAccessWriter.writeHeader(recordSchema, getOutputStream());
+ }
+
final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, avroSchema);
datumWriter.write(rec, encoder);
return schemaAccessWriter.getAttributes(recordSchema);
http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index 34a51ba..00270ed 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -95,6 +95,12 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
@Override
public Map<String, String> writeRecord(final Record record) throws IOException {
+ // If we are not writing an active record set, then we need to ensure that we write the
+ // schema information.
+ if (!isActiveRecordSet()) {
+ schemaWriter.writeHeader(recordSchema, getOutputStream());
+ }
+
int i = 0;
for (final RecordField recordField : recordSchema.getFields()) {
fieldValues[i++] = record.getAsString(recordField, getFormat(recordField));
http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index 8acaa04..cccac12 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -104,6 +104,13 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
@Override
public Map<String, String> writeRecord(final Record record) throws IOException {
+ // If we are not writing an active record set, then we need to ensure that we write the
+ // schema information.
+ if (!isActiveRecordSet()) {
+ generator.flush();
+ schemaAccess.writeHeader(recordSchema, getOutputStream());
+ }
+
writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject());
return schemaAccess.getAttributes(recordSchema);
}