You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/09/23 02:56:37 UTC
[4/5] nifi git commit: NIFI-4201: This closes #2024. Implementation
of processors for interacting with Kafka 0.11
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
new file mode 100644
index 0000000..de19648
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerLease.java
@@ -0,0 +1,691 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_PARSE_FAILURE;
+import static org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11.REL_SUCCESS;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.HEX_ENCODING;
+import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_ENCODING;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+/**
+ * This class represents a lease to access a Kafka Consumer object. The lease is
+ * intended to be obtained from a ConsumerPool. The lease is closeable to allow
+ * for the clean model of a try w/resources whereby non-exceptional cases mean
+ * the lease will be returned to the pool for future use by others. A given
+ * lease may only belong to a single thread a time.
+ */
+public abstract class ConsumerLease implements Closeable, ConsumerRebalanceListener {
+
+ private final long maxWaitMillis;
+ private final Consumer<byte[], byte[]> kafkaConsumer;
+ private final ComponentLog logger;
+ private final byte[] demarcatorBytes;
+ private final String keyEncoding;
+ private final String securityProtocol;
+ private final String bootstrapServers;
+ private final RecordSetWriterFactory writerFactory;
+ private final RecordReaderFactory readerFactory;
+ private final Charset headerCharacterSet;
+ private final Pattern headerNamePattern;
+ private boolean poisoned = false;
+ //used for tracking demarcated flowfiles to their TopicPartition so we can append
+ //to them on subsequent poll calls
+ private final Map<BundleInformation, BundleTracker> bundleMap = new HashMap<>();
+ private final Map<TopicPartition, OffsetAndMetadata> uncommittedOffsetsMap = new HashMap<>();
+ private long leaseStartNanos = -1;
+ private boolean lastPollEmpty = false;
+ private int totalMessages = 0;
+
+ ConsumerLease(
+ final long maxWaitMillis,
+ final Consumer<byte[], byte[]> kafkaConsumer,
+ final byte[] demarcatorBytes,
+ final String keyEncoding,
+ final String securityProtocol,
+ final String bootstrapServers,
+ final RecordReaderFactory readerFactory,
+ final RecordSetWriterFactory writerFactory,
+ final ComponentLog logger,
+ final Charset headerCharacterSet,
+ final Pattern headerNamePattern) {
+ this.maxWaitMillis = maxWaitMillis;
+ this.kafkaConsumer = kafkaConsumer;
+ this.demarcatorBytes = demarcatorBytes;
+ this.keyEncoding = keyEncoding;
+ this.securityProtocol = securityProtocol;
+ this.bootstrapServers = bootstrapServers;
+ this.readerFactory = readerFactory;
+ this.writerFactory = writerFactory;
+ this.logger = logger;
+ this.headerCharacterSet = headerCharacterSet;
+ this.headerNamePattern = headerNamePattern;
+ }
+
+ /**
+ * clears out internal state elements excluding session and consumer as
+ * those are managed by the pool itself
+ */
+ private void resetInternalState() {
+ bundleMap.clear();
+ uncommittedOffsetsMap.clear();
+ leaseStartNanos = -1;
+ lastPollEmpty = false;
+ totalMessages = 0;
+ }
+
+ /**
+ * Kafka will call this method whenever it is about to rebalance the
+ * consumers for the given partitions. We'll simply take this to mean that
+ * we need to quickly commit what we've got and will return the consumer to
+ * the pool. This method will be called during the poll() method call of
+ * this class and will be called by the same thread calling poll according
+ * to the Kafka API docs. After this method executes the session and kafka
+ * offsets are committed and this lease is closed.
+ *
+ * @param partitions partitions being reassigned
+ */
+ @Override
+ public void onPartitionsRevoked(final Collection<TopicPartition> partitions) {
+ logger.debug("Rebalance Alert: Paritions '{}' revoked for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+ //force a commit here. Can reuse the session and consumer after this but must commit now to avoid duplicates if kafka reassigns partition
+ commit();
+ }
+
+ /**
+ * This will be called by Kafka when the rebalance has completed. We don't
+ * need to do anything with this information other than optionally log it as
+ * by this point we've committed what we've got and moved on.
+ *
+ * @param partitions topic partition set being reassigned
+ */
+ @Override
+ public void onPartitionsAssigned(final Collection<TopicPartition> partitions) {
+ logger.debug("Rebalance Alert: Paritions '{}' assigned for lease '{}' with consumer '{}'", new Object[]{partitions, this, kafkaConsumer});
+ }
+
+ /**
+ * Executes a poll on the underlying Kafka Consumer and creates any new
+ * flowfiles necessary or appends to existing ones if in demarcation mode.
+ */
+ void poll() {
+ /**
+ * Implementation note:
+ * Even if ConsumeKafka is not scheduled to poll due to downstream connection back-pressure is engaged,
+ * for longer than session.timeout.ms (defaults to 10 sec), Kafka consumer sends heartbeat from background thread.
+ * If this situation lasts longer than max.poll.interval.ms (defaults to 5 min), Kafka consumer sends
+ * Leave Group request to Group Coordinator. When ConsumeKafka processor is scheduled again, Kafka client checks
+ * if this client instance is still a part of consumer group. If not, it rejoins before polling messages.
+ * This behavior has been fixed via Kafka KIP-62 and available from Kafka client 0.10.1.0.
+ */
+ try {
+ final ConsumerRecords<byte[], byte[]> records = kafkaConsumer.poll(10);
+ lastPollEmpty = records.count() == 0;
+ processRecords(records);
+ } catch (final ProcessException pe) {
+ throw pe;
+ } catch (final Throwable t) {
+ this.poison();
+ throw t;
+ }
+ }
+
+ /**
+ * Notifies Kafka to commit the offsets for the specified topic/partition
+ * pairs to the specified offsets w/the given metadata. This can offer
+ * higher performance than the other commitOffsets call as it allows the
+ * kafka client to collect more data from Kafka before committing the
+ * offsets.
+ *
+ * if false then we didn't do anything and should probably yield if true
+ * then we committed new data
+ *
+ */
+ boolean commit() {
+ if (uncommittedOffsetsMap.isEmpty()) {
+ resetInternalState();
+ return false;
+ }
+ try {
+ /**
+ * Committing the nifi session then the offsets means we have an at
+ * least once guarantee here. If we reversed the order we'd have at
+ * most once.
+ */
+ final Collection<FlowFile> bundledFlowFiles = getBundles();
+ if (!bundledFlowFiles.isEmpty()) {
+ getProcessSession().transfer(bundledFlowFiles, REL_SUCCESS);
+ }
+ getProcessSession().commit();
+
+ final Map<TopicPartition, OffsetAndMetadata> offsetsMap = uncommittedOffsetsMap;
+ kafkaConsumer.commitSync(offsetsMap);
+ resetInternalState();
+ return true;
+ } catch (final IOException ioe) {
+ poison();
+ logger.error("Failed to finish writing out FlowFile bundle", ioe);
+ throw new ProcessException(ioe);
+ } catch (final KafkaException kex) {
+ poison();
+ logger.warn("Duplicates are likely as we were able to commit the process"
+ + " session but received an exception from Kafka while committing"
+ + " offsets.");
+ throw kex;
+ } catch (final Throwable t) {
+ poison();
+ throw t;
+ }
+ }
+
+ /**
+ * Indicates whether we should continue polling for data. If we are not
+ * writing data with a demarcator then we're writing individual flow files
+ * per kafka message therefore we must be very mindful of memory usage for
+ * the flow file objects (not their content) being held in memory. The
+ * content of kafka messages will be written to the content repository
+ * immediately upon each poll call but we must still be mindful of how much
+ * memory can be used in each poll call. We will indicate that we should
+ * stop polling our last poll call produced no new results or if we've
+ * polling and processing data longer than the specified maximum polling
+ * time or if we have reached out specified max flow file limit or if a
+ * rebalance has been initiated for one of the partitions we're watching;
+ * otherwise true.
+ *
+ * @return true if should keep polling; false otherwise
+ */
+ boolean continuePolling() {
+ //stop if the last poll produced new no data
+ if (lastPollEmpty) {
+ return false;
+ }
+
+ //stop if we've gone past our desired max uncommitted wait time
+ if (leaseStartNanos < 0) {
+ leaseStartNanos = System.nanoTime();
+ }
+ final long durationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+ if (durationMillis > maxWaitMillis) {
+ return false;
+ }
+
+ //stop if we've generated enough flowfiles that we need to be concerned about memory usage for the objects
+ if (bundleMap.size() > 200) { //a magic number - the number of simultaneous bundles to track
+ return false;
+ } else {
+ return totalMessages < 1000;//admittedlly a magic number - good candidate for processor property
+ }
+ }
+
+ /**
+ * Indicates that the underlying session and consumer should be immediately
+ * considered invalid. Once closed the session will be rolled back and the
+ * pool should destroy the underlying consumer. This is useful if due to
+ * external reasons, such as the processor no longer being scheduled, this
+ * lease should be terminated immediately.
+ */
+ private void poison() {
+ poisoned = true;
+ }
+
+ /**
+ * @return true if this lease has been poisoned; false otherwise
+ */
+ boolean isPoisoned() {
+ return poisoned;
+ }
+
+ /**
+ * Trigger the consumer's {@link KafkaConsumer#wakeup() wakeup()} method.
+ */
+ public void wakeup() {
+ kafkaConsumer.wakeup();
+ }
+
+ /**
+ * Abstract method that is intended to be extended by the pool that created
+ * this ConsumerLease object. It should ensure that the session given to
+ * create this session is rolled back and that the underlying kafka consumer
+ * is either returned to the pool for continued use or destroyed if this
+ * lease has been poisoned. It can only be called once. Calling it more than
+ * once can result in undefined and non threadsafe behavior.
+ */
+ @Override
+ public void close() {
+ resetInternalState();
+ }
+
+ public abstract ProcessSession getProcessSession();
+
+ public abstract void yield();
+
+ private void processRecords(final ConsumerRecords<byte[], byte[]> records) {
+ records.partitions().stream().forEach(partition -> {
+ List<ConsumerRecord<byte[], byte[]>> messages = records.records(partition);
+ if (!messages.isEmpty()) {
+ //update maximum offset map for this topic partition
+ long maxOffset = messages.stream()
+ .mapToLong(record -> record.offset())
+ .max()
+ .getAsLong();
+
+ //write records to content repository and session
+ if (demarcatorBytes != null) {
+ writeDemarcatedData(getProcessSession(), messages, partition);
+ } else if (readerFactory != null && writerFactory != null) {
+ writeRecordData(getProcessSession(), messages, partition);
+ } else {
+ messages.stream().forEach(message -> {
+ writeData(getProcessSession(), message, partition);
+ });
+ }
+
+ totalMessages += messages.size();
+ uncommittedOffsetsMap.put(partition, new OffsetAndMetadata(maxOffset + 1L));
+ }
+ });
+ }
+
+ private static String encodeKafkaKey(final byte[] key, final String encoding) {
+ if (key == null) {
+ return null;
+ }
+
+ if (HEX_ENCODING.getValue().equals(encoding)) {
+ return DatatypeConverter.printHexBinary(key);
+ } else if (UTF8_ENCODING.getValue().equals(encoding)) {
+ return new String(key, StandardCharsets.UTF_8);
+ } else {
+ return null; // won't happen because it is guaranteed by the Allowable Values
+ }
+ }
+
+ private Collection<FlowFile> getBundles() throws IOException {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ for (final BundleTracker tracker : bundleMap.values()) {
+ final boolean includeBundle = processBundle(tracker);
+ if (includeBundle) {
+ flowFiles.add(tracker.flowFile);
+ }
+ }
+ return flowFiles;
+ }
+
+ private boolean processBundle(final BundleTracker bundle) throws IOException {
+ final RecordSetWriter writer = bundle.recordWriter;
+ if (writer != null) {
+ final WriteResult writeResult;
+
+ try {
+ writeResult = writer.finishRecordSet();
+ } finally {
+ writer.close();
+ }
+
+ if (writeResult.getRecordCount() == 0) {
+ getProcessSession().remove(bundle.flowFile);
+ return false;
+ }
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.putAll(writeResult.getAttributes());
+ attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+
+ bundle.flowFile = getProcessSession().putAllAttributes(bundle.flowFile, attributes);
+ }
+
+ populateAttributes(bundle);
+ return true;
+ }
+
+ private void writeData(final ProcessSession session, ConsumerRecord<byte[], byte[]> record, final TopicPartition topicPartition) {
+ FlowFile flowFile = session.create();
+ final BundleTracker tracker = new BundleTracker(record, topicPartition, keyEncoding);
+ tracker.incrementRecordCount(1);
+ flowFile = session.write(flowFile, out -> {
+ out.write(record.value());
+ });
+ tracker.updateFlowFile(flowFile);
+ populateAttributes(tracker);
+ session.transfer(tracker.flowFile, REL_SUCCESS);
+ }
+
+ private void writeDemarcatedData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+ // Group the Records by their BundleInformation
+ final Map<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> map = records.stream()
+ .collect(Collectors.groupingBy(rec -> new BundleInformation(topicPartition, null, getAttributes(rec))));
+
+ for (final Map.Entry<BundleInformation, List<ConsumerRecord<byte[], byte[]>>> entry : map.entrySet()) {
+ final BundleInformation bundleInfo = entry.getKey();
+ final List<ConsumerRecord<byte[], byte[]>> recordList = entry.getValue();
+
+ final boolean demarcateFirstRecord;
+
+ BundleTracker tracker = bundleMap.get(bundleInfo);
+
+ FlowFile flowFile;
+ if (tracker == null) {
+ tracker = new BundleTracker(recordList.get(0), topicPartition, keyEncoding);
+ flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, bundleInfo.attributes);
+ tracker.updateFlowFile(flowFile);
+ demarcateFirstRecord = false; //have not yet written records for this topic/partition in this lease
+ } else {
+ demarcateFirstRecord = true; //have already been writing records for this topic/partition in this lease
+ }
+ flowFile = tracker.flowFile;
+
+ tracker.incrementRecordCount(recordList.size());
+ flowFile = session.append(flowFile, out -> {
+ boolean useDemarcator = demarcateFirstRecord;
+ for (final ConsumerRecord<byte[], byte[]> record : recordList) {
+ if (useDemarcator) {
+ out.write(demarcatorBytes);
+ }
+ out.write(record.value());
+ useDemarcator = true;
+ }
+ });
+
+ tracker.updateFlowFile(flowFile);
+ bundleMap.put(bundleInfo, tracker);
+ }
+ }
+
+ private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause) {
+ handleParseFailure(consumerRecord, session, cause, "Failed to parse message from Kafka using the configured Record Reader. "
+ + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+ }
+
+ private void handleParseFailure(final ConsumerRecord<byte[], byte[]> consumerRecord, final ProcessSession session, final Exception cause, final String message) {
+ // If we are unable to parse the data, we need to transfer it to 'parse failure' relationship
+ final Map<String, String> attributes = getAttributes(consumerRecord);
+ attributes.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(consumerRecord.offset()));
+ attributes.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(consumerRecord.partition()));
+ attributes.put(KafkaProcessorUtils.KAFKA_TOPIC, consumerRecord.topic());
+
+ FlowFile failureFlowFile = session.create();
+
+ failureFlowFile = session.write(failureFlowFile, out -> out.write(consumerRecord.value()));
+ failureFlowFile = session.putAllAttributes(failureFlowFile, attributes);
+
+ final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, consumerRecord.topic());
+ session.getProvenanceReporter().receive(failureFlowFile, transitUri);
+
+ session.transfer(failureFlowFile, REL_PARSE_FAILURE);
+
+ if (cause == null) {
+ logger.error(message);
+ } else {
+ logger.error(message, cause);
+ }
+
+ session.adjustCounter("Parse Failures", 1, false);
+ }
+
+ private Map<String, String> getAttributes(final ConsumerRecord<?, ?> consumerRecord) {
+ final Map<String, String> attributes = new HashMap<>();
+ if (headerNamePattern == null) {
+ return attributes;
+ }
+
+ for (final Header header : consumerRecord.headers()) {
+ final String attributeName = header.key();
+ if (headerNamePattern.matcher(attributeName).matches()) {
+ attributes.put(attributeName, new String(header.value(), headerCharacterSet));
+ }
+ }
+
+ return attributes;
+ }
+
+ private void writeRecordData(final ProcessSession session, final List<ConsumerRecord<byte[], byte[]>> records, final TopicPartition topicPartition) {
+ // In order to obtain a RecordReader from the RecordReaderFactory, we need to give it a FlowFile.
+ // We don't want to create a new FlowFile for each record that we receive, so we will just create
+ // a "temporary flowfile" that will be removed in the finally block below and use that to pass to
+ // the createRecordReader method.
+ RecordSetWriter writer = null;
+ try {
+ for (final ConsumerRecord<byte[], byte[]> consumerRecord : records) {
+ final Map<String, String> attributes = getAttributes(consumerRecord);
+
+ final Record record;
+ try (final InputStream in = new ByteArrayInputStream(consumerRecord.value())) {
+ final RecordReader reader = readerFactory.createRecordReader(attributes, in, logger);
+ record = reader.nextRecord();
+ } catch (final Exception e) {
+ handleParseFailure(consumerRecord, session, e);
+ continue;
+ }
+
+ if (record == null) {
+ handleParseFailure(consumerRecord, session, null);
+ continue;
+ }
+
+ // Determine the bundle for this record.
+ final RecordSchema recordSchema = record.getSchema();
+ final BundleInformation bundleInfo = new BundleInformation(topicPartition, recordSchema, attributes);
+
+ BundleTracker tracker = bundleMap.get(bundleInfo);
+ if (tracker == null) {
+ FlowFile flowFile = session.create();
+ flowFile = session.putAllAttributes(flowFile, attributes);
+
+ final OutputStream rawOut = session.write(flowFile);
+
+ final RecordSchema writeSchema;
+ try {
+ writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
+ } catch (final Exception e) {
+ logger.error("Failed to obtain Schema for FlowFile. Will roll back the Kafka message offsets.", e);
+
+ try {
+ rollback(topicPartition);
+ } catch (final Exception rollbackException) {
+ logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
+ }
+
+ yield();
+ throw new ProcessException(e);
+ }
+
+ writer = writerFactory.createWriter(logger, writeSchema, rawOut);
+ writer.beginRecordSet();
+
+ tracker = new BundleTracker(consumerRecord, topicPartition, keyEncoding, writer);
+ tracker.updateFlowFile(flowFile);
+ bundleMap.put(bundleInfo, tracker);
+ } else {
+ writer = tracker.recordWriter;
+ }
+
+ try {
+ writer.write(record);
+ } catch (final RuntimeException re) {
+ handleParseFailure(consumerRecord, session, re, "Failed to write message from Kafka using the configured Record Writer. "
+ + "Will route message as its own FlowFile to the 'parse.failure' relationship");
+ continue;
+ }
+
+ tracker.incrementRecordCount(1L);
+ }
+
+ session.adjustCounter("Records Received", records.size(), false);
+ } catch (final Exception e) {
+ logger.error("Failed to properly receive messages from Kafka. Will roll back session and any un-committed offsets from Kafka.", e);
+
+ try {
+ if (writer != null) {
+ writer.close();
+ }
+ } catch (final Exception ioe) {
+ logger.warn("Failed to close Record Writer", ioe);
+ }
+
+ try {
+ rollback(topicPartition);
+ } catch (final Exception rollbackException) {
+ logger.warn("Attempted to rollback Kafka message offset but was unable to do so", rollbackException);
+ }
+
+ throw new ProcessException(e);
+ }
+ }
+
+
+ private void rollback(final TopicPartition topicPartition) {
+ OffsetAndMetadata offsetAndMetadata = uncommittedOffsetsMap.get(topicPartition);
+ if (offsetAndMetadata == null) {
+ offsetAndMetadata = kafkaConsumer.committed(topicPartition);
+ }
+
+ final long offset = offsetAndMetadata.offset();
+ kafkaConsumer.seek(topicPartition, offset);
+ }
+
+
+
+ private void populateAttributes(final BundleTracker tracker) {
+ final Map<String, String> kafkaAttrs = new HashMap<>();
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_OFFSET, String.valueOf(tracker.initialOffset));
+ if (tracker.key != null && tracker.totalRecords == 1) {
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_KEY, tracker.key);
+ }
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_PARTITION, String.valueOf(tracker.partition));
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_TOPIC, tracker.topic);
+ if (tracker.totalRecords > 1) {
+ // Add a record.count attribute to remain consistent with other record-oriented processors. If not
+ // reading/writing records, then use "kafka.count" attribute.
+ if (tracker.recordWriter == null) {
+ kafkaAttrs.put(KafkaProcessorUtils.KAFKA_COUNT, String.valueOf(tracker.totalRecords));
+ } else {
+ kafkaAttrs.put("record.count", String.valueOf(tracker.totalRecords));
+ }
+ }
+ final FlowFile newFlowFile = getProcessSession().putAllAttributes(tracker.flowFile, kafkaAttrs);
+ final long executionDurationMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - leaseStartNanos);
+ final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, tracker.topic);
+ getProcessSession().getProvenanceReporter().receive(newFlowFile, transitUri, executionDurationMillis);
+ tracker.updateFlowFile(newFlowFile);
+ }
+
+ private static class BundleTracker {
+
+ final long initialOffset;
+ final int partition;
+ final String topic;
+ final String key;
+ final RecordSetWriter recordWriter;
+ FlowFile flowFile;
+ long totalRecords = 0;
+
+ private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding) {
+ this(initialRecord, topicPartition, keyEncoding, null);
+ }
+
+ private BundleTracker(final ConsumerRecord<byte[], byte[]> initialRecord, final TopicPartition topicPartition, final String keyEncoding, final RecordSetWriter recordWriter) {
+ this.initialOffset = initialRecord.offset();
+ this.partition = topicPartition.partition();
+ this.topic = topicPartition.topic();
+ this.recordWriter = recordWriter;
+ this.key = encodeKafkaKey(initialRecord.key(), keyEncoding);
+ }
+
+ private void incrementRecordCount(final long count) {
+ totalRecords += count;
+ }
+
+ private void updateFlowFile(final FlowFile flowFile) {
+ this.flowFile = flowFile;
+ }
+
+ }
+
+ private static class BundleInformation {
+ private final TopicPartition topicPartition;
+ private final RecordSchema schema;
+ private final Map<String, String> attributes;
+
+ public BundleInformation(final TopicPartition topicPartition, final RecordSchema schema, final Map<String, String> attributes) {
+ this.topicPartition = topicPartition;
+ this.schema = schema;
+ this.attributes = attributes;
+ }
+
+ @Override
+ public int hashCode() {
+ return 41 + 13 * topicPartition.hashCode() + ((schema == null) ? 0 : 13 * schema.hashCode()) + ((attributes == null) ? 0 : 13 * attributes.hashCode());
+ }
+
+ @Override
+ public boolean equals(final Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof BundleInformation)) {
+ return false;
+ }
+
+ final BundleInformation other = (BundleInformation) obj;
+ return Objects.equals(topicPartition, other.topicPartition) && Objects.equals(schema, other.schema) && Objects.equals(attributes, other.attributes);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
new file mode 100644
index 0000000..a7bd96d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+/**
+ * A pool of Kafka Consumers for a given topic. Consumers can be obtained by
+ * calling 'obtainConsumer'. Once closed the pool is ready to be immediately
+ * used again.
+ */
+public class ConsumerPool implements Closeable {
+
+ private final BlockingQueue<SimpleConsumerLease> pooledLeases;
+ private final List<String> topics;
+ private final Pattern topicPattern;
+ private final Map<String, Object> kafkaProperties;
+ private final long maxWaitMillis;
+ private final ComponentLog logger;
+ private final byte[] demarcatorBytes;
+ private final String keyEncoding;
+ private final String securityProtocol;
+ private final String bootstrapServers;
+ private final boolean honorTransactions;
+ private final RecordReaderFactory readerFactory;
+ private final RecordSetWriterFactory writerFactory;
+ private final Charset headerCharacterSet;
+ private final Pattern headerNamePattern;
+ private final AtomicLong consumerCreatedCountRef = new AtomicLong();
+ private final AtomicLong consumerClosedCountRef = new AtomicLong();
+ private final AtomicLong leasesObtainedCountRef = new AtomicLong();
+
+ /**
+ * Creates a pool of KafkaConsumer objects that will grow up to the maximum
+ * indicated threads from the given context. Consumers are lazily
+ * initialized. We may elect to not create up to the maximum number of
+ * configured consumers if the broker reported lag time for all topics is
+ * below a certain threshold.
+ *
+ * @param maxConcurrentLeases max allowable consumers at once
+ * @param demarcator bytes to use as demarcator between messages; null or
+ * empty means no demarcator
+ * @param kafkaProperties properties to use to initialize kafka consumers
+ * @param topics the topics to subscribe to
+ * @param maxWaitMillis maximum time to wait for a given lease to acquire
+ * data before committing
+ * @param keyEncoding the encoding to use for the key of a kafka message if
+ * found
+ * @param securityProtocol the security protocol used
+ * @param bootstrapServers the bootstrap servers
+ * @param logger the logger to report any errors/warnings
+ */
+ public ConsumerPool(
+ final int maxConcurrentLeases,
+ final byte[] demarcator,
+ final Map<String, Object> kafkaProperties,
+ final List<String> topics,
+ final long maxWaitMillis,
+ final String keyEncoding,
+ final String securityProtocol,
+ final String bootstrapServers,
+ final ComponentLog logger,
+ final boolean honorTransactions,
+ final Charset headerCharacterSet,
+ final Pattern headerNamePattern) {
+ this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+ this.maxWaitMillis = maxWaitMillis;
+ this.logger = logger;
+ this.demarcatorBytes = demarcator;
+ this.keyEncoding = keyEncoding;
+ this.securityProtocol = securityProtocol;
+ this.bootstrapServers = bootstrapServers;
+ this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+ this.topics = Collections.unmodifiableList(topics);
+ this.topicPattern = null;
+ this.readerFactory = null;
+ this.writerFactory = null;
+ this.honorTransactions = honorTransactions;
+ this.headerCharacterSet = headerCharacterSet;
+ this.headerNamePattern = headerNamePattern;
+ }
+
+ public ConsumerPool(
+ final int maxConcurrentLeases,
+ final byte[] demarcator,
+ final Map<String, Object> kafkaProperties,
+ final Pattern topics,
+ final long maxWaitMillis,
+ final String keyEncoding,
+ final String securityProtocol,
+ final String bootstrapServers,
+ final ComponentLog logger,
+ final boolean honorTransactions,
+ final Charset headerCharacterSet,
+ final Pattern headerNamePattern) {
+ this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+ this.maxWaitMillis = maxWaitMillis;
+ this.logger = logger;
+ this.demarcatorBytes = demarcator;
+ this.keyEncoding = keyEncoding;
+ this.securityProtocol = securityProtocol;
+ this.bootstrapServers = bootstrapServers;
+ this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+ this.topics = null;
+ this.topicPattern = topics;
+ this.readerFactory = null;
+ this.writerFactory = null;
+ this.honorTransactions = honorTransactions;
+ this.headerCharacterSet = headerCharacterSet;
+ this.headerNamePattern = headerNamePattern;
+ }
+
+ public ConsumerPool(
+ final int maxConcurrentLeases,
+ final RecordReaderFactory readerFactory,
+ final RecordSetWriterFactory writerFactory,
+ final Map<String, Object> kafkaProperties,
+ final Pattern topics,
+ final long maxWaitMillis,
+ final String securityProtocol,
+ final String bootstrapServers,
+ final ComponentLog logger,
+ final boolean honorTransactions,
+ final Charset headerCharacterSet,
+ final Pattern headerNamePattern) {
+ this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+ this.maxWaitMillis = maxWaitMillis;
+ this.logger = logger;
+ this.demarcatorBytes = null;
+ this.keyEncoding = null;
+ this.readerFactory = readerFactory;
+ this.writerFactory = writerFactory;
+ this.securityProtocol = securityProtocol;
+ this.bootstrapServers = bootstrapServers;
+ this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+ this.topics = null;
+ this.topicPattern = topics;
+ this.honorTransactions = honorTransactions;
+ this.headerCharacterSet = headerCharacterSet;
+ this.headerNamePattern = headerNamePattern;
+ }
+
+ public ConsumerPool(
+ final int maxConcurrentLeases,
+ final RecordReaderFactory readerFactory,
+ final RecordSetWriterFactory writerFactory,
+ final Map<String, Object> kafkaProperties,
+ final List<String> topics,
+ final long maxWaitMillis,
+ final String securityProtocol,
+ final String bootstrapServers,
+ final ComponentLog logger,
+ final boolean honorTransactions,
+ final Charset headerCharacterSet,
+ final Pattern headerNamePattern) {
+ this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+ this.maxWaitMillis = maxWaitMillis;
+ this.logger = logger;
+ this.demarcatorBytes = null;
+ this.keyEncoding = null;
+ this.readerFactory = readerFactory;
+ this.writerFactory = writerFactory;
+ this.securityProtocol = securityProtocol;
+ this.bootstrapServers = bootstrapServers;
+ this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+ this.topics = topics;
+ this.topicPattern = null;
+ this.honorTransactions = honorTransactions;
+ this.headerCharacterSet = headerCharacterSet;
+ this.headerNamePattern = headerNamePattern;
+ }
+
+ /**
+ * Obtains a consumer from the pool if one is available or lazily
+ * initializes a new one if deemed necessary.
+ *
+ * @param session the session for which the consumer lease will be
+ * associated
+ * @param processContext the ProcessContext for which the consumer
+ * lease will be associated
+ * @return consumer to use or null if not available or necessary
+ */
+ public ConsumerLease obtainConsumer(final ProcessSession session, final ProcessContext processContext) {
+ SimpleConsumerLease lease = pooledLeases.poll();
+ if (lease == null) {
+ final Consumer<byte[], byte[]> consumer = createKafkaConsumer();
+ consumerCreatedCountRef.incrementAndGet();
+ /**
+ * For now return a new consumer lease. But we could later elect to
+ * have this return null if we determine the broker indicates that
+ * the lag time on all topics being monitored is sufficiently low.
+ * For now we should encourage conservative use of threads because
+ * having too many means we'll have at best useless threads sitting
+ * around doing frequent network calls and at worst having consumers
+ * sitting idle which could prompt excessive rebalances.
+ */
+ lease = new SimpleConsumerLease(consumer);
+ /**
+ * This subscription tightly couples the lease to the given
+ * consumer. They cannot be separated from then on.
+ */
+ if (topics != null) {
+ consumer.subscribe(topics, lease);
+ } else {
+ consumer.subscribe(topicPattern, lease);
+ }
+ }
+ lease.setProcessSession(session, processContext);
+
+ leasesObtainedCountRef.incrementAndGet();
+ return lease;
+ }
+
+ /**
+ * Exposed as protected method for easier unit testing
+ *
+ * @return consumer
+ * @throws KafkaException if unable to subscribe to the given topics
+ */
+ protected Consumer<byte[], byte[]> createKafkaConsumer() {
+ final Map<String, Object> properties = new HashMap<>(kafkaProperties);
+ if (honorTransactions) {
+ properties.put("isolation.level", "read_committed");
+ } else {
+ properties.put("isolation.level", "read_uncommitted");
+ }
+ final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties);
+ return consumer;
+ }
+
+ /**
+ * Closes all consumers in the pool. Can be safely called repeatedly.
+ */
+ @Override
+ public void close() {
+ final List<SimpleConsumerLease> leases = new ArrayList<>();
+ pooledLeases.drainTo(leases);
+ leases.stream().forEach((lease) -> {
+ lease.close(true);
+ });
+ }
+
+ private void closeConsumer(final Consumer<?, ?> consumer) {
+ consumerClosedCountRef.incrementAndGet();
+ try {
+ consumer.unsubscribe();
+ } catch (Exception e) {
+ logger.warn("Failed while unsubscribing " + consumer, e);
+ }
+
+ try {
+ consumer.close();
+ } catch (Exception e) {
+ logger.warn("Failed while closing " + consumer, e);
+ }
+ }
+
+ PoolStats getPoolStats() {
+ return new PoolStats(consumerCreatedCountRef.get(), consumerClosedCountRef.get(), leasesObtainedCountRef.get());
+ }
+
+ private class SimpleConsumerLease extends ConsumerLease {
+
+ private final Consumer<byte[], byte[]> consumer;
+ private volatile ProcessSession session;
+ private volatile ProcessContext processContext;
+ private volatile boolean closedConsumer;
+
+ private SimpleConsumerLease(final Consumer<byte[], byte[]> consumer) {
+ super(maxWaitMillis, consumer, demarcatorBytes, keyEncoding, securityProtocol, bootstrapServers,
+ readerFactory, writerFactory, logger, headerCharacterSet, headerNamePattern);
+ this.consumer = consumer;
+ }
+
+ void setProcessSession(final ProcessSession session, final ProcessContext context) {
+ this.session = session;
+ this.processContext = context;
+ }
+
+ @Override
+ public void yield() {
+ if (processContext != null) {
+ processContext.yield();
+ }
+ }
+
+ @Override
+ public ProcessSession getProcessSession() {
+ return session;
+ }
+
+ @Override
+ public void close() {
+ super.close();
+ close(false);
+ }
+
+ public void close(final boolean forceClose) {
+ if (closedConsumer) {
+ return;
+ }
+ super.close();
+ if (session != null) {
+ session.rollback();
+ setProcessSession(null, null);
+ }
+ if (forceClose || isPoisoned() || !pooledLeases.offer(this)) {
+ closedConsumer = true;
+ closeConsumer(consumer);
+ }
+ }
+ }
+
+ static final class PoolStats {
+
+ final long consumerCreatedCount;
+ final long consumerClosedCount;
+ final long leasesObtainedCount;
+
+ PoolStats(
+ final long consumerCreatedCount,
+ final long consumerClosedCount,
+ final long leasesObtainedCount
+ ) {
+ this.consumerCreatedCount = consumerCreatedCount;
+ this.consumerClosedCount = consumerClosedCount;
+ this.leasesObtainedCount = leasesObtainedCount;
+ }
+
+ @Override
+ public String toString() {
+ return "Created Consumers [" + consumerCreatedCount + "]\n"
+ + "Closed Consumers [" + consumerClosedCount + "]\n"
+ + "Leases Obtained [" + leasesObtainedCount + "]\n";
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
new file mode 100644
index 0000000..317b274
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+
+public class InFlightMessageTracker {
+ private final ConcurrentMap<FlowFile, Counts> messageCountsByFlowFile = new ConcurrentHashMap<>();
+ private final ConcurrentMap<FlowFile, Exception> failures = new ConcurrentHashMap<>();
+ private final Object progressMutex = new Object();
+ private final ComponentLog logger;
+
+ public InFlightMessageTracker(final ComponentLog logger) {
+ this.logger = logger;
+ }
+
+ public void incrementAcknowledgedCount(final FlowFile flowFile) {
+ final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
+ counter.incrementAcknowledgedCount();
+
+ synchronized (progressMutex) {
+ progressMutex.notify();
+ }
+ }
+
+ public void trackEmpty(final FlowFile flowFile) {
+ messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
+ }
+
+ public int getAcknowledgedCount(final FlowFile flowFile) {
+ final Counts counter = messageCountsByFlowFile.get(flowFile);
+ return (counter == null) ? 0 : counter.getAcknowledgedCount();
+ }
+
+ public void incrementSentCount(final FlowFile flowFile) {
+ final Counts counter = messageCountsByFlowFile.computeIfAbsent(flowFile, ff -> new Counts());
+ counter.incrementSentCount();
+ }
+
+ public int getSentCount(final FlowFile flowFile) {
+ final Counts counter = messageCountsByFlowFile.get(flowFile);
+ return (counter == null) ? 0 : counter.getSentCount();
+ }
+
+ public void fail(final FlowFile flowFile, final Exception exception) {
+ failures.putIfAbsent(flowFile, exception);
+ logger.error("Failed to send " + flowFile + " to Kafka", exception);
+
+ synchronized (progressMutex) {
+ progressMutex.notify();
+ }
+ }
+
+ public Exception getFailure(final FlowFile flowFile) {
+ return failures.get(flowFile);
+ }
+
+ public boolean isFailed(final FlowFile flowFile) {
+ return getFailure(flowFile) != null;
+ }
+
+ public void reset() {
+ messageCountsByFlowFile.clear();
+ failures.clear();
+ }
+
+ public PublishResult failOutstanding(final Exception exception) {
+ messageCountsByFlowFile.keySet().stream()
+ .filter(ff -> !isComplete(ff))
+ .filter(ff -> !failures.containsKey(ff))
+ .forEach(ff -> failures.put(ff, exception));
+
+ return createPublishResult();
+ }
+
+ private boolean isComplete(final FlowFile flowFile) {
+ final Counts counts = messageCountsByFlowFile.get(flowFile);
+ if (counts.getAcknowledgedCount() == counts.getSentCount()) {
+ // all messages received successfully.
+ return true;
+ }
+
+ if (failures.containsKey(flowFile)) {
+ // FlowFile failed so is complete
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean isComplete() {
+ return messageCountsByFlowFile.keySet().stream()
+ .allMatch(flowFile -> isComplete(flowFile));
+ }
+
+ void awaitCompletion(final long millis) throws InterruptedException, TimeoutException {
+ final long startTime = System.nanoTime();
+ final long maxTime = startTime + TimeUnit.MILLISECONDS.toNanos(millis);
+
+ while (System.nanoTime() < maxTime) {
+ synchronized (progressMutex) {
+ if (isComplete()) {
+ return;
+ }
+
+ progressMutex.wait(millis);
+ }
+ }
+
+ throw new TimeoutException();
+ }
+
+
+ PublishResult createPublishResult() {
+ return new PublishResult() {
+ @Override
+ public boolean isFailure() {
+ return !failures.isEmpty();
+ }
+
+ @Override
+ public int getSuccessfulMessageCount(final FlowFile flowFile) {
+ return getAcknowledgedCount(flowFile);
+ }
+
+ @Override
+ public Exception getReasonForFailure(final FlowFile flowFile) {
+ return getFailure(flowFile);
+ }
+ };
+ }
+
+ public static class Counts {
+ private final AtomicInteger sentCount = new AtomicInteger(0);
+ private final AtomicInteger acknowledgedCount = new AtomicInteger(0);
+
+ public void incrementSentCount() {
+ sentCount.incrementAndGet();
+ }
+
+ public void incrementAcknowledgedCount() {
+ acknowledgedCount.incrementAndGet();
+ }
+
+ public int getAcknowledgedCount() {
+ return acknowledgedCount.get();
+ }
+
+ public int getSentCount() {
+ return sentCount.get();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
new file mode 100644
index 0000000..de28995
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.ssl.SSLContextService;
+import org.apache.nifi.util.FormatUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class KafkaProcessorUtils {
+
+ final Logger logger = LoggerFactory.getLogger(this.getClass());
+
+ static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+ static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+ "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
+
+ static final Pattern HEX_KEY_PATTERN = Pattern.compile("(?:[0123456789abcdefABCDEF]{2})+");
+
+ static final String KAFKA_KEY = "kafka.key";
+ static final String KAFKA_TOPIC = "kafka.topic";
+ static final String KAFKA_PARTITION = "kafka.partition";
+ static final String KAFKA_OFFSET = "kafka.offset";
+ static final String KAFKA_COUNT = "kafka.count";
+ static final AllowableValue SEC_PLAINTEXT = new AllowableValue("PLAINTEXT", "PLAINTEXT", "PLAINTEXT");
+ static final AllowableValue SEC_SSL = new AllowableValue("SSL", "SSL", "SSL");
+ static final AllowableValue SEC_SASL_PLAINTEXT = new AllowableValue("SASL_PLAINTEXT", "SASL_PLAINTEXT", "SASL_PLAINTEXT");
+ static final AllowableValue SEC_SASL_SSL = new AllowableValue("SASL_SSL", "SASL_SSL", "SASL_SSL");
+
+ static final PropertyDescriptor BOOTSTRAP_SERVERS = new PropertyDescriptor.Builder()
+ .name(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
+ .displayName("Kafka Brokers")
+ .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
+ .required(true)
+ .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("localhost:9092")
+ .build();
+ static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
+ .name("security.protocol")
+ .displayName("Security Protocol")
+ .description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL)
+ .defaultValue(SEC_PLAINTEXT.getValue())
+ .build();
+ static final PropertyDescriptor KERBEROS_PRINCIPLE = new PropertyDescriptor.Builder()
+ .name("sasl.kerberos.service.name")
+ .displayName("Kerberos Service Name")
+ .description("The Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config. "
+ + "Corresponds to Kafka's 'security.protocol' property."
+ + "It is ignored unless one of the SASL options of the <Security Protocol> are selected.")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+ static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
+ .name("sasl.kerberos.principal")
+ .displayName("Kerberos Principal")
+ .description("The Kerberos principal that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
+ + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+ static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
+ .name("sasl.kerberos.keytab")
+ .displayName("Kerberos Keytab")
+ .description("The Kerberos keytab that will be used to connect to brokers. If not set, it is expected to set a JAAS configuration file "
+ + "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
+ .required(false)
+ .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .build();
+ static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
+ .name("ssl.context.service")
+ .displayName("SSL Context Service")
+ .description("Specifies the SSL Context Service to use for communicating with Kafka.")
+ .required(false)
+ .identifiesControllerService(SSLContextService.class)
+ .build();
+
+ static List<PropertyDescriptor> getCommonPropertyDescriptors() {
+ return Arrays.asList(
+ BOOTSTRAP_SERVERS,
+ SECURITY_PROTOCOL,
+ KERBEROS_PRINCIPLE,
+ USER_PRINCIPAL,
+ USER_KEYTAB,
+ SSL_CONTEXT_SERVICE
+ );
+ }
+
+ static Collection<ValidationResult> validateCommonProperties(final ValidationContext validationContext) {
+ List<ValidationResult> results = new ArrayList<>();
+
+ String securityProtocol = validationContext.getProperty(SECURITY_PROTOCOL).getValue();
+
+ /*
+ * validates that if one of SASL (Kerberos) option is selected for
+ * security protocol, then Kerberos principal is provided as well
+ */
+ if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
+ String kerberosPrincipal = validationContext.getProperty(KERBEROS_PRINCIPLE).getValue();
+ if (kerberosPrincipal == null || kerberosPrincipal.trim().length() == 0) {
+ results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
+ .explanation("The <" + KERBEROS_PRINCIPLE.getDisplayName() + "> property must be set when <"
+ + SECURITY_PROTOCOL.getDisplayName() + "> is configured as '"
+ + SEC_SASL_PLAINTEXT.getValue() + "' or '" + SEC_SASL_SSL.getValue() + "'.")
+ .build());
+ }
+
+ String userKeytab = validationContext.getProperty(USER_KEYTAB).getValue();
+ String userPrincipal = validationContext.getProperty(USER_PRINCIPAL).getValue();
+ if((StringUtils.isBlank(userKeytab) && !StringUtils.isBlank(userPrincipal))
+ || (!StringUtils.isBlank(userKeytab) && StringUtils.isBlank(userPrincipal))) {
+ results.add(new ValidationResult.Builder().subject(KERBEROS_PRINCIPLE.getDisplayName()).valid(false)
+ .explanation("Both <" + USER_KEYTAB.getDisplayName() + "> and <" + USER_PRINCIPAL.getDisplayName() + "> "
+ + "must be set.")
+ .build());
+ }
+ }
+
+ //If SSL or SASL_SSL then CS must be set.
+ final boolean sslProtocol = SEC_SSL.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol);
+ final boolean csSet = validationContext.getProperty(SSL_CONTEXT_SERVICE).isSet();
+ if (csSet && !sslProtocol) {
+ results.add(new ValidationResult.Builder().subject(SECURITY_PROTOCOL.getDisplayName()).valid(false)
+ .explanation("If you set the SSL Controller Service you should also choose an SSL based security protocol.").build());
+ }
+ if (!csSet && sslProtocol) {
+ results.add(new ValidationResult.Builder().subject(SSL_CONTEXT_SERVICE.getDisplayName()).valid(false)
+ .explanation("If you set to an SSL based protocol you need to set the SSL Controller Service").build());
+ }
+
+ final String enableAutoCommit = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).build()).getValue();
+ if (enableAutoCommit != null && !enableAutoCommit.toLowerCase().equals("false")) {
+ results.add(new ValidationResult.Builder().subject(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)
+ .explanation("Enable auto commit must be false. It is managed by the processor.").build());
+ }
+
+ final String keySerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).build()).getValue();
+ if (keySerializer != null && !ByteArraySerializer.class.getName().equals(keySerializer)) {
+ results.add(new ValidationResult.Builder().subject(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)
+ .explanation("Key Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + keySerializer + "'").build());
+ }
+
+ final String valueSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).build()).getValue();
+ if (valueSerializer != null && !ByteArraySerializer.class.getName().equals(valueSerializer)) {
+ results.add(new ValidationResult.Builder().subject(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)
+ .explanation("Value Serializer must be " + ByteArraySerializer.class.getName() + "' was '" + valueSerializer + "'").build());
+ }
+
+ final String keyDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG).build()).getValue();
+ if (keyDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(keyDeSerializer)) {
+ results.add(new ValidationResult.Builder().subject(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)
+ .explanation("Key De-Serializer must be '" + ByteArrayDeserializer.class.getName() + "' was '" + keyDeSerializer + "'").build());
+ }
+
+ final String valueDeSerializer = validationContext.getProperty(new PropertyDescriptor.Builder().name(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG).build()).getValue();
+ if (valueDeSerializer != null && !ByteArrayDeserializer.class.getName().equals(valueDeSerializer)) {
+ results.add(new ValidationResult.Builder().subject(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)
+ .explanation("Value De-Serializer must be " + ByteArrayDeserializer.class.getName() + "' was '" + valueDeSerializer + "'").build());
+ }
+
+ return results;
+ }
+
+ static final class KafkaConfigValidator implements Validator {
+
+ final Class<?> classType;
+
+ public KafkaConfigValidator(final Class<?> classType) {
+ this.classType = classType;
+ }
+
+ @Override
+ public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
+ final boolean knownValue = KafkaProcessorUtils.isStaticStringFieldNamePresent(subject, classType, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class);
+ return new ValidationResult.Builder().subject(subject).explanation("Must be a known configuration parameter for this kafka client").valid(knownValue).build();
+ }
+ };
+
+ /**
+ * Builds transit URI for provenance event. The transit URI will be in the
+ * form of <security.protocol>://<bootstrap.servers>/topic
+ */
+ static String buildTransitURI(String securityProtocol, String brokers, String topic) {
+ StringBuilder builder = new StringBuilder();
+ builder.append(securityProtocol);
+ builder.append("://");
+ builder.append(brokers);
+ builder.append("/");
+ builder.append(topic);
+ return builder.toString();
+ }
+
+
+ static void buildCommonKafkaProperties(final ProcessContext context, final Class<?> kafkaConfigClass, final Map<String, Object> mapToPopulate) {
+ for (PropertyDescriptor propertyDescriptor : context.getProperties().keySet()) {
+ if (propertyDescriptor.equals(SSL_CONTEXT_SERVICE)) {
+ // Translate SSLContext Service configuration into Kafka properties
+ final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
+ if (sslContextService != null && sslContextService.isKeyStoreConfigured()) {
+ mapToPopulate.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, sslContextService.getKeyStoreFile());
+ mapToPopulate.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, sslContextService.getKeyStorePassword());
+ final String keyPass = sslContextService.getKeyPassword() == null ? sslContextService.getKeyStorePassword() : sslContextService.getKeyPassword();
+ mapToPopulate.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, keyPass);
+ mapToPopulate.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, sslContextService.getKeyStoreType());
+ }
+
+ if (sslContextService != null && sslContextService.isTrustStoreConfigured()) {
+ mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, sslContextService.getTrustStoreFile());
+ mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslContextService.getTrustStorePassword());
+ mapToPopulate.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, sslContextService.getTrustStoreType());
+ }
+ }
+
+ String propertyName = propertyDescriptor.getName();
+ String propertyValue = propertyDescriptor.isExpressionLanguageSupported()
+ ? context.getProperty(propertyDescriptor).evaluateAttributeExpressions().getValue()
+ : context.getProperty(propertyDescriptor).getValue();
+
+ if (propertyValue != null && !propertyName.equals(USER_PRINCIPAL.getName()) && !propertyName.equals(USER_KEYTAB.getName())) {
+ // If the property name ends in ".ms" then it is a time period. We want to accept either an integer as number of milliseconds
+ // or the standard NiFi time period such as "5 secs"
+ if (propertyName.endsWith(".ms") && !StringUtils.isNumeric(propertyValue.trim())) { // kafka standard time notation
+ propertyValue = String.valueOf(FormatUtils.getTimeDuration(propertyValue.trim(), TimeUnit.MILLISECONDS));
+ }
+
+ if (isStaticStringFieldNamePresent(propertyName, kafkaConfigClass, CommonClientConfigs.class, SslConfigs.class, SaslConfigs.class)) {
+ mapToPopulate.put(propertyName, propertyValue);
+ }
+ }
+ }
+
+ String securityProtocol = context.getProperty(SECURITY_PROTOCOL).getValue();
+ if (SEC_SASL_PLAINTEXT.getValue().equals(securityProtocol) || SEC_SASL_SSL.getValue().equals(securityProtocol)) {
+ setJaasConfig(mapToPopulate, context);
+ }
+ }
+
+ /**
+ * Method used to configure the 'sasl.jaas.config' property based on KAFKA-4259<br />
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-85%3A+Dynamic+JAAS+configuration+for+Kafka+clients<br />
+ * <br />
+ * It expects something with the following format: <br />
+ * <br />
+ * <LoginModuleClass> <ControlFlag> *(<OptionName>=<OptionValue>); <br />
+ * ControlFlag = required / requisite / sufficient / optional
+ *
+ * @param mapToPopulate Map of configuration properties
+ * @param context Context
+ */
+ private static void setJaasConfig(Map<String, Object> mapToPopulate, ProcessContext context) {
+ String keytab = context.getProperty(USER_KEYTAB).getValue();
+ String principal = context.getProperty(USER_PRINCIPAL).getValue();
+ String serviceName = context.getProperty(KERBEROS_PRINCIPLE).getValue();
+ if(StringUtils.isNotBlank(keytab) && StringUtils.isNotBlank(principal) && StringUtils.isNotBlank(serviceName)) {
+ mapToPopulate.put(SaslConfigs.SASL_JAAS_CONFIG, "com.sun.security.auth.module.Krb5LoginModule required "
+ + "useTicketCache=false "
+ + "renewTicket=true "
+ + "serviceName=\"" + serviceName + "\" "
+ + "useKeyTab=true "
+ + "keyTab=\"" + keytab + "\" "
+ + "principal=\"" + principal + "\";");
+ }
+ }
+
+ private static boolean isStaticStringFieldNamePresent(final String name, final Class<?>... classes) {
+ return KafkaProcessorUtils.getPublicStaticStringFieldValues(classes).contains(name);
+ }
+
+ private static Set<String> getPublicStaticStringFieldValues(final Class<?>... classes) {
+ final Set<String> strings = new HashSet<>();
+ for (final Class<?> classType : classes) {
+ for (final Field field : classType.getDeclaredFields()) {
+ if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers()) && field.getType().equals(String.class)) {
+ try {
+ strings.add(String.valueOf(field.get(null)));
+ } catch (IllegalArgumentException | IllegalAccessException ex) {
+ //ignore
+ }
+ }
+ }
+ }
+ return strings;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
new file mode 100644
index 0000000..64ab4ce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/Partitioners.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.util.Map;
+
+import org.apache.kafka.clients.producer.Partitioner;
+import org.apache.kafka.common.Cluster;
+
+/**
+ * Collection of implementation of common Kafka {@link Partitioner}s.
+ */
+final public class Partitioners {
+
+ private Partitioners() {
+ }
+
+ /**
+ * {@link Partitioner} that implements 'round-robin' mechanism which evenly
+ * distributes load between all available partitions.
+ */
+ public static class RoundRobinPartitioner implements Partitioner {
+
+ private volatile int index;
+
+ @Override
+ public void configure(Map<String, ?> configs) {
+ // noop
+ }
+
+ @Override
+ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
+ return this.next(cluster.availablePartitionsForTopic(topic).size());
+ }
+
+ @Override
+ public void close() {
+ // noop
+ }
+
+ private synchronized int next(int numberOfPartitions) {
+ if (this.index >= numberOfPartitions) {
+ this.index = 0;
+ }
+ return index++;
+ }
+ }
+}