You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/22 19:03:42 UTC

nifi git commit: NIFI-3953: This closes #1837. Allow multiple schemas on same kafka topic/partition for ConsumeKafkaRecord_0_10 Also, updated record writers to ensure that they write the schema as appropriate if not using a RecordSet. Updated ConsumeKafk

Repository: nifi
Updated Branches:
  refs/heads/master 6d16fdf17 -> 6937a6cf6


NIFI-3953: This closes #1837. Allow multiple schemas on same kafka topic/partition for ConsumeKafkaRecord_0_10
Also, updated record writers to ensure that they write the schema as appropriate if not using a RecordSet. Updated ConsumeKafkaRecord to allow for multiple schemas to be on same topic and partition

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6937a6cf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6937a6cf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6937a6cf

Branch: refs/heads/master
Commit: 6937a6cf64c2f9e437b96550259575488eb284ec
Parents: 6d16fdf
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon May 22 12:02:18 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Mon May 22 14:37:53 2017 -0400

----------------------------------------------------------------------
 .../serialization/AbstractRecordSetWriter.java  |   4 +-
 .../processors/kafka/pubsub/ConsumerLease.java  | 179 ++++++++++---------
 .../processors/kafka/pubsub/PublisherLease.java |   1 +
 .../avro/WriteAvroResultWithExternalSchema.java |   8 +-
 .../org/apache/nifi/csv/WriteCSVResult.java     |   6 +
 .../org/apache/nifi/json/WriteJsonResult.java   |   7 +
 6 files changed, 120 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
index 6ce9138..4de5ce3 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/AbstractRecordSetWriter.java
@@ -68,7 +68,7 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter {
         return recordCount;
     }
 
-    protected final boolean isRecordSetActive() {
+    protected final boolean isActiveRecordSet() {
         return activeRecordSet;
     }
 
@@ -84,7 +84,7 @@ public abstract class AbstractRecordSetWriter implements RecordSetWriter {
 
     @Override
     public final WriteResult finishRecordSet() throws IOException {
-        if (!isRecordSetActive()) {
+        if (!isActiveRecordSet()) {
             throw new IllegalStateException("Cannot finish RecordSet because no RecordSet has begun");
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
index 563ece6..effd2e4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -21,22 +21,18 @@ import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10.RE
 import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
 import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
 
-import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
-import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import javax.xml.bind.DatatypeConverter;
 
@@ -57,11 +53,10 @@ import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.Tuple;
 
 /**
  * This class represents a lease to access a Kafka Consumer object. The lease is
@@ -411,94 +406,116 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
             return;
         }
 
-        FlowFile flowFile = session.create();
-        try {
-            final RecordSchema schema;
+        final Map<RecordSchema, Tuple<FlowFile, RecordSetWriter>> writers = new HashMap<>();
 
-            try {
-                schema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
-            } catch (final Exception e) {
-                logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
+        // In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
+        // We don't want to create a new FlowFile for each record that we receive, so we will just create
+        // a "temporary flowfile" that will be removed in the finally block below and use that to pass to
+        // the createRecordReader method.
+        final FlowFile tempFlowFile = session.create();
+        try {
+            final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
+            while (itr.hasNext()) {
+                final ConsumerRecord<byte[], byte[]> consumerRecord = itr.next();
+                final InputStream in = new ByteArrayInputStream(consumerRecord.value());
 
+                final Record record;
                 try {
-                    rollback(topicPartition);
-                } catch (final Exception rollbackException) {
-                    logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
-                }
+                    final RecordReader reader = readerFactory.createRecordReader(tempFlowFile, in, logger);
+                    record = reader.nextRecord();
+                } catch (final Exception e) {
+                    // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
+                    final Map<String, String> attributes = new HashMap<>();
+                    attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+                    attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
+                    attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
 
-                yield();
-                throw new ProcessException(e);
-            }
+                    FlowFile failureFlowFile = session.create();
+                    failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
+                    failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
 
-            final FlowFile ff = flowFile;
-            final AtomicReference<WriteResult> writeResult = new AtomicReference<>();
-            final AtomicReference<String> mimeTypeRef = new AtomicReference<>();
+                    final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
+                    session.getProvenanceReporter().receive(failureFlowFile, transitUri);
 
-            flowFile = session.write(flowFile, rawOut -> {
-                final Iterator<ConsumerRecord<byte[], byte[]>> itr = records.iterator();
+                    session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+                    logger.error("Failed to parse message from Kafka using the configured Record Reader. "
+                        + "Will route message as its own FlowFile to the 'parse.failure' relationship", e);
 
-                final RecordSchema emptySchema = new SimpleRecordSchema(Collections.emptyList());
-                final RecordSet recordSet = new RecordSet() {
-                    @Override
-                    public RecordSchema getSchema() throws IOException {
-                        return emptySchema;
-                    }
+                    session.adjustCounter("Parse Failures", 1, false);
+                    continue;
+                }
+
+                final RecordSchema recordSchema = record.getSchema();
 
-                    @Override
-                    public Record next() throws IOException {
-                        while (itr.hasNext()) {
-                            final ConsumerRecord<byte[], byte[]> consumerRecord = itr.next();
-
-                            final InputStream in = new ByteArrayInputStream(consumerRecord.value());
-                            try {
-                                final RecordReader reader = readerFactory.createRecordReader(ff, in, logger);
-                                final Record record = reader.nextRecord();
-                                return record;
-                            } catch (final Exception e) {
-                                final Map<String, String> attributes = new HashMap<>();
-                                attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
-                                attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
-                                attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
-
-                                FlowFile failureFlowFile = session.create();
-                                failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
-                                failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
-
-                                session.transfer(failureFlowFile, REL_PARSE_FAILURE);
-                                logger.error("Failed to parse message from Kafka using the configured Record Reader. "
-                                    + "Will route message as its own FlowFile to the 'parse.failure' relationship", e);
-
-                                session.adjustCounter("Parse Failures", 1, false);
-                            }
+                Tuple<FlowFile, RecordSetWriter> tuple = writers.get(recordSchema);
+                if (tuple == null) {
+                    FlowFile flowFile = session.create();
+                    final OutputStream rawOut = session.write(flowFile);
+
+                    final RecordSchema writeSchema;
+                    try {
+                        writeSchema = writerFactory.getSchema(flowFile, new ByteArrayInputStream(records.get(0).value()));
+                    } catch (final Exception e) {
+                        logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
+
+                        try {
+                            rollback(topicPartition);
+                        } catch (final Exception rollbackException) {
+                            logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
                         }
 
-                        return null;
+                        yield();
+                        throw new ProcessException(e);
                     }
-                };
 
-                try (final OutputStream out = new BufferedOutputStream(rawOut);
-                    final RecordSetWriter writer = writerFactory.createWriter(logger, schema, ff, out)) {
-                    writeResult.set(writer.write(recordSet));
-                    mimeTypeRef.set(writer.getMimeType());
-                } catch (final Exception e) {
-                    logger.error("Failed to write records to FlowFile. Will roll back the Kafka message offsets.", e);
+                    final RecordSetWriter writer = writerFactory.createWriter(logger, writeSchema, flowFile, rawOut);
+                    writer.beginRecordSet();
 
-                    try {
-                        rollback(topicPartition);
-                    } catch (final Exception rollbackException) {
-                        logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
-                    }
+                    tuple = new Tuple<>(flowFile, writer);
+                    writers.put(recordSchema, tuple);
+                }
+
+                final RecordSetWriter writer = tuple.getValue();
+                writer.write(record);
+            }
+        } catch (final Exception e) {
+            logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);
+
+            try {
+                rollback(topicPartition);
+            } catch (final Exception rollbackException) {
+                logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
+            }
+
+            throw new ProcessException(e);
+        } finally {
+            session.remove(tempFlowFile);
+        }
+
+        for (final Tuple<FlowFile, RecordSetWriter> tuple : writers.values()) {
+            FlowFile flowFile = tuple.getKey();
+            final RecordSetWriter writer = tuple.getValue();
 
-                    yield();
-                    throw new ProcessException(e);
+            final WriteResult writeResult;
+            try {
+                writeResult = writer.finishRecordSet();
+                writer.close();
+            } catch (final Exception e) {
+                logger.error("Failed to finish writing records to Content Repository", e);
+                try {
+                    rollback(topicPartition);
+                } catch (final Exception rollbackException) {
+                    logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
                 }
-            });
+                throw new ProcessException(e);
+            }
 
-            final WriteResult result = writeResult.get();
-            if (result.getRecordCount() > 0) {
-                final Map<String, String> attributes = new HashMap<>(result.getAttributes());
-                attributes.put(CoreAttributes.MIME_TYPE.key(), mimeTypeRef.get());
-                attributes.put("record.count", String.valueOf(result.getRecordCount()));
+            final int recordCount = writeResult.getRecordCount();
+            if (recordCount > 0) {
+                final Map<String, String> attributes = new HashMap<>();
+                attributes.putAll(writeResult.getAttributes());
+                attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                attributes.put("record.count", String.valueOf(recordCount));
 
                 attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(topicPartition.partition()));
                 attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, topicPartition.topic());
@@ -509,17 +526,15 @@ public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListe
                 final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topicPartition.topic());
                 session.getProvenanceReporter().receive(flowFile, transitUri, executionDurationMillis);
 
-                session.adjustCounter("Records Received", result.getRecordCount(), false);
+                session.adjustCounter("Records Received", recordCount, false);
                 session.transfer(flowFile, REL_SUCCESS);
             } else {
                 session.remove(flowFile);
             }
-        } catch (final Exception e) {
-            session.remove(flowFile);
-            throw e;
         }
     }
 
+
     private void populateAttributes(final BundleTracker tracker) {
         final Map<String, String> kafkaAttrs = new HashMap<>();
         kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));

http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 66641df..4238956 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -112,6 +112,7 @@ public class PublisherLease implements Closeable {
             while ((record = recordSet.next()) != null) {
                 recordCount++;
                 baos.reset();
+
                 writer.write(record);
                 writer.flush();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
index 25d494e..8464e45 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResultWithExternalSchema.java
@@ -50,7 +50,6 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
         this.buffered = new BufferedOutputStream(out);
 
         datumWriter = new GenericDatumWriter<>(avroSchema);
-        schemaAccessWriter.writeHeader(recordSchema, buffered);
         encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, null);
     }
 
@@ -67,6 +66,13 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
 
     @Override
     public Map<String, String> writeRecord(final Record record) throws IOException {
+        // If we are not writing an active record set, then we need to ensure that we write the
+        // schema information.
+        if (!isActiveRecordSet()) {
+            flush();
+            schemaAccessWriter.writeHeader(recordSchema, getOutputStream());
+        }
+
         final GenericRecord rec = AvroTypeUtil.createAvroRecord(record, avroSchema);
         datumWriter.write(rec, encoder);
         return schemaAccessWriter.getAttributes(recordSchema);

http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index 34a51ba..00270ed 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -95,6 +95,12 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
 
     @Override
     public Map<String, String> writeRecord(final Record record) throws IOException {
+        // If we are not writing an active record set, then we need to ensure that we write the
+        // schema information.
+        if (!isActiveRecordSet()) {
+            schemaWriter.writeHeader(recordSchema, getOutputStream());
+        }
+
         int i = 0;
         for (final RecordField recordField : recordSchema.getFields()) {
             fieldValues[i++] = record.getAsString(recordField, getFormat(recordField));

http://git-wip-us.apache.org/repos/asf/nifi/blob/6937a6cf/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index 8acaa04..cccac12 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -104,6 +104,13 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
 
     @Override
     public Map<String, String> writeRecord(final Record record) throws IOException {
+        // If we are not writing an active record set, then we need to ensure that we write the
+        // schema information.
+        if (!isActiveRecordSet()) {
+            generator.flush();
+            schemaAccess.writeHeader(recordSchema, getOutputStream());
+        }
+
         writeRecord(record, recordSchema, generator, g -> g.writeStartObject(), g -> g.writeEndObject());
         return schemaAccess.getAttributes(recordSchema);
     }