You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/09/23 02:56:36 UTC
[3/5] nifi git commit: NIFI-4201: This closes #2024. Implementation
of processors for interacting with Kafka 0.11
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
new file mode 100644
index 0000000..e015d9d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.FlowFileFilters;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+@Tags({"Apache", "Kafka", "Record", "csv", "json", "avro", "logs", "Put", "Send", "Message", "PubSub", "0.11.x"})
+@CapabilityDescription("Sends the contents of a FlowFile as individual records to Apache Kafka using the Kafka 0.11.x Producer API. "
+ + "The contents of the FlowFile are expected to be record-oriented data that can be read by the configured Record Reader. "
+ + " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring"
+ + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the meantime"
+ + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on. The complementary NiFi "
+ + "processor for fetching messages is ConsumeKafka_0_11_Record.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
+ description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
+@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
+ + "FlowFiles that are routed to success.")
+@SeeAlso({PublishKafka_0_11.class, ConsumeKafka_0_11.class, ConsumeKafkaRecord_0_11.class})
+public class PublishKafkaRecord_0_11 extends AbstractProcessor {
+ protected static final String MSG_COUNT = "msg.count";
+
+ static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
+ "FlowFile will be routed to failure unless the message is replicated to the appropriate "
+ + "number of Kafka Nodes according to the Topic configuration");
+ static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
+ "FlowFile will be routed to success if the message is received by a single Kafka node, "
+ + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> "
+ + "but can result in data loss if a Kafka node crashes");
+ static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
+ "FlowFile will be routed to success after successfully writing the content to a Kafka node, "
+ + "without waiting for a response. This provides the best performance but may result in data loss.");
+
+ static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(),
+ Partitioners.RoundRobinPartitioner.class.getSimpleName(),
+ "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
+ + "the next Partition to Partition 2, and so on, wrapping as necessary.");
+ static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+ "DefaultPartitioner", "Messages will be assigned to random partitions.");
+
+ static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+ static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+ "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
+
+ static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+ .name("topic")
+ .displayName("Topic Name")
+ .description("The name of the Kafka Topic to publish to.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+ .name("record-reader")
+ .displayName("Record Reader")
+ .description("The Record Reader to use for incoming FlowFiles")
+ .identifiesControllerService(RecordReaderFactory.class)
+ .expressionLanguageSupported(false)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("The Record Writer to use in order to serialize the data before sending to Kafka")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .expressionLanguageSupported(false)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor MESSAGE_KEY_FIELD = new PropertyDescriptor.Builder()
+ .name("message-key-field")
+ .displayName("Message Key Field")
+ .description("The name of a field in the Input Records that should be used as the Key for the Kafka message.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
+ .name("acks")
+ .displayName("Delivery Guarantee")
+ .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
+ .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+ .build();
+
+ static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder()
+ .name("max.block.ms")
+ .displayName("Max Metadata Wait Time")
+ .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the "
+ + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("5 sec")
+ .build();
+
+ static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
+ .name("ack.wait.time")
+ .displayName("Acknowledgment Wait Time")
+ .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+ + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .required(true)
+ .defaultValue("5 secs")
+ .build();
+
+ static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
+ .name("max.request.size")
+ .displayName("Max Request Size")
+ .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .build();
+
+ static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
+ .name("partitioner.class")
+ .displayName("Partitioner class")
+ .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
+ .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+ .defaultValue(RANDOM_PARTITIONING.getValue())
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
+ .name("compression.type")
+ .displayName("Compression Type")
+ .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("none", "gzip", "snappy", "lz4")
+ .defaultValue("none")
+ .build();
+
+ static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
+ .name("attribute-name-regex")
+ .displayName("Attributes to Send as Headers (Regex)")
+ .description("A Regular Expression that is matched against all FlowFile attribute names. "
+ + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
+ + "If not specified, no FlowFile attributes will be added as headers.")
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .required(false)
+ .build();
+ static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder()
+ .name("use-transactions")
+ .displayName("Use Transactions")
+ .description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, "
+ + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. "
+ + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true "
+ + "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"")
+ .expressionLanguageSupported(false)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+ static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder()
+ .name("message-header-encoding")
+ .displayName("Message Header Encoding")
+ .description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, "
+ + "this property indicates the Character Encoding to use for serializing the headers.")
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .defaultValue("UTF-8")
+ .required(false)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles for which all content was sent to Kafka.")
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES;
+ private static final Set<Relationship> RELATIONSHIPS;
+
+ private volatile PublisherPool publisherPool = null;
+
+ static {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(KafkaProcessorUtils.BOOTSTRAP_SERVERS);
+ properties.add(TOPIC);
+ properties.add(RECORD_READER);
+ properties.add(RECORD_WRITER);
+ properties.add(USE_TRANSACTIONS);
+ properties.add(DELIVERY_GUARANTEE);
+ properties.add(ATTRIBUTE_NAME_REGEX);
+ properties.add(MESSAGE_HEADER_ENCODING);
+ properties.add(KafkaProcessorUtils.SECURITY_PROTOCOL);
+ properties.add(KafkaProcessorUtils.KERBEROS_PRINCIPLE);
+ properties.add(KafkaProcessorUtils.USER_PRINCIPAL);
+ properties.add(KafkaProcessorUtils.USER_KEYTAB);
+ properties.add(KafkaProcessorUtils.SSL_CONTEXT_SERVICE);
+ properties.add(MESSAGE_KEY_FIELD);
+ properties.add(MAX_REQUEST_SIZE);
+ properties.add(ACK_WAIT_TIME);
+ properties.add(METADATA_WAIT_TIME);
+ properties.add(PARTITION_CLASS);
+ properties.add(COMPRESSION_CODEC);
+
+ PROPERTIES = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
+ .name(propertyDescriptorName)
+ .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+ results.addAll(KafkaProcessorUtils.validateCommonProperties(validationContext));
+
+ final boolean useTransactions = validationContext.getProperty(USE_TRANSACTIONS).asBoolean();
+ if (useTransactions) {
+ final String deliveryGuarantee = validationContext.getProperty(DELIVERY_GUARANTEE).getValue();
+ if (!DELIVERY_REPLICATED.getValue().equals(deliveryGuarantee)) {
+ results.add(new ValidationResult.Builder()
+ .subject("Delivery Guarantee")
+ .valid(false)
+ .explanation("In order to use Transactions, the Delivery Guarantee must be \"Guarantee Replicated Delivery.\" "
+ + "Either change the <Use Transactions> property or the <Delivery Guarantee> property.")
+ .build());
+ }
+ }
+
+ return results;
+ }
+
+ private synchronized PublisherPool getPublisherPool(final ProcessContext context) {
+ PublisherPool pool = publisherPool;
+ if (pool != null) {
+ return pool;
+ }
+
+ return publisherPool = createPublisherPool(context);
+ }
+
+ protected PublisherPool createPublisherPool(final ProcessContext context) {
+ final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
+ final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+
+ final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
+ final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
+ final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+
+ final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
+ final Charset charset = Charset.forName(charsetName);
+
+ final Map<String, Object> kafkaProperties = new HashMap<>();
+ KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+ kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
+
+ return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis, useTransactions, attributeNamePattern, charset);
+ }
+
+ @OnStopped
+ public void closePool() {
+ if (publisherPool != null) {
+ publisherPool.close();
+ }
+
+ publisherPool = null;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 500));
+ if (flowFiles.isEmpty()) {
+ return;
+ }
+
+ final PublisherPool pool = getPublisherPool(context);
+ if (pool == null) {
+ context.yield();
+ return;
+ }
+
+ final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+ final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+ final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+
+ final long startTime = System.nanoTime();
+ try (final PublisherLease lease = pool.obtainPublisher()) {
+ if (useTransactions) {
+ lease.beginTransaction();
+ }
+
+ // Send each FlowFile to Kafka asynchronously.
+ for (final FlowFile flowFile : flowFiles) {
+ if (!isScheduled()) {
+ // If stopped, re-queue FlowFile instead of sending it
+ if (useTransactions) {
+ session.rollback();
+ lease.rollback();
+ return;
+ }
+
+ session.transfer(flowFile);
+ continue;
+ }
+
+ final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+ final String messageKeyField = context.getProperty(MESSAGE_KEY_FIELD).evaluateAttributeExpressions(flowFile).getValue();
+
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream rawIn) throws IOException {
+ try (final InputStream in = new BufferedInputStream(rawIn)) {
+ final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger());
+ final RecordSet recordSet = reader.createRecordSet();
+
+ final RecordSchema schema = writerFactory.getSchema(flowFile.getAttributes(), recordSet.getSchema());
+ lease.publish(flowFile, recordSet, writerFactory, schema, messageKeyField, topic);
+ } catch (final SchemaNotFoundException | MalformedRecordException e) {
+ throw new ProcessException(e);
+ }
+ }
+ });
+ } catch (final Exception e) {
+ // The FlowFile will be obtained and the error logged below, when calling publishResult.getFailedFlowFiles()
+ lease.fail(flowFile, e);
+ continue;
+ }
+ }
+
+ // Complete the send
+ final PublishResult publishResult = lease.complete();
+
+ if (publishResult.isFailure()) {
+ getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
+ session.transfer(flowFiles, REL_FAILURE);
+ return;
+ }
+
+ // Transfer any successful FlowFiles.
+ final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+ for (FlowFile success : flowFiles) {
+ final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
+
+ final int msgCount = publishResult.getSuccessfulMessageCount(success);
+ success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
+ session.adjustCounter("Messages Sent", msgCount, true);
+
+ final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+ session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
+ session.transfer(success, REL_SUCCESS);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java
new file mode 100644
index 0000000..6f33150
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java
@@ -0,0 +1,461 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import javax.xml.bind.DatatypeConverter;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.FlowFileFilters;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.11.x"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka using the Kafka 0.11.x Producer API."
+ + "The messages to send may be individual FlowFiles or may be delimited, using a "
+ + "user-specified delimiter, such as a new-line. "
+ + " Please note there are cases where the publisher can get into an indefinite stuck state. We are closely monitoring"
+ + " how this evolves in the Kafka community and will take advantage of those fixes as soon as we can. In the meantime"
+ + " it is possible to enter states where the only resolution will be to restart the JVM NiFi runs on. "
+ + "The complementary NiFi processor for fetching messages is ConsumeKafka_0_11.")
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
+ description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
+@WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
+ + "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
+ + "be greater than 1.")
+public class PublishKafka_0_11 extends AbstractProcessor {
+ protected static final String MSG_COUNT = "msg.count";
+
+ static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
+ "FlowFile will be routed to failure unless the message is replicated to the appropriate "
+ + "number of Kafka Nodes according to the Topic configuration");
+ static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
+ "FlowFile will be routed to success if the message is received by a single Kafka node, "
+ + "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> "
+ + "but can result in data loss if a Kafka node crashes");
+ static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
+ "FlowFile will be routed to success after successfully writing the content to a Kafka node, "
+ + "without waiting for a response. This provides the best performance but may result in data loss.");
+
+ static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue(Partitioners.RoundRobinPartitioner.class.getName(),
+ Partitioners.RoundRobinPartitioner.class.getSimpleName(),
+ "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
+ + "the next Partition to Partition 2, and so on, wrapping as necessary.");
+ static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("org.apache.kafka.clients.producer.internals.DefaultPartitioner",
+ "DefaultPartitioner", "Messages will be assigned to random partitions.");
+
+ static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
+ static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
+ "The key is interpreted as arbitrary binary data that is encoded using hexadecimal characters with uppercase letters.");
+
+ static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+ .name("topic")
+ .displayName("Topic Name")
+ .description("The name of the Kafka Topic to publish to.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
+ .name(ProducerConfig.ACKS_CONFIG)
+ .displayName("Delivery Guarantee")
+ .description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
+ .required(true)
+ .expressionLanguageSupported(false)
+ .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
+ .defaultValue(DELIVERY_BEST_EFFORT.getValue())
+ .build();
+
+ static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder()
+ .name(ProducerConfig.MAX_BLOCK_MS_CONFIG)
+ .displayName("Max Metadata Wait Time")
+ .description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the "
+ + "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
+ .required(true)
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .defaultValue("5 sec")
+ .build();
+
+ static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
+ .name("ack.wait.time")
+ .displayName("Acknowledgment Wait Time")
+ .description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+ + "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .required(true)
+ .defaultValue("5 secs")
+ .build();
+
+ static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
+ .name("max.request.size")
+ .displayName("Max Request Size")
+ .description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
+ .required(true)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .defaultValue("1 MB")
+ .build();
+
+ static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+ .name("kafka-key")
+ .displayName("Kafka Key")
+ .description("The Key to use for the Message. "
+ + "If not specified, the flow file attribute 'kafka.key' is used as the message key, if it is present "
+ + "and we're not demarcating.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .build();
+
+ static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
+ .name("key-attribute-encoding")
+ .displayName("Key Attribute Encoding")
+ .description("FlowFiles that are emitted have an attribute named '" + KafkaProcessorUtils.KAFKA_KEY + "'. This property dictates how the value of the attribute should be encoded.")
+ .required(true)
+ .defaultValue(UTF8_ENCODING.getValue())
+ .allowableValues(UTF8_ENCODING, HEX_ENCODING)
+ .build();
+
+ static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+ .name("message-demarcator")
+ .displayName("Message Demarcator")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within "
+ + "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
+ + "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
+ + "To enter special character such as 'new line' use CTRL+Enter or Shift+Enter, depending on your OS.")
+ .build();
+
+ static final PropertyDescriptor PARTITION_CLASS = new PropertyDescriptor.Builder()
+ .name(ProducerConfig.PARTITIONER_CLASS_CONFIG)
+ .displayName("Partitioner class")
+ .description("Specifies which class to use to compute a partition id for a message. Corresponds to Kafka's 'partitioner.class' property.")
+ .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING)
+ .defaultValue(RANDOM_PARTITIONING.getValue())
+ .required(false)
+ .build();
+
+ static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
+ .name(ProducerConfig.COMPRESSION_TYPE_CONFIG)
+ .displayName("Compression Type")
+ .description("This parameter allows you to specify the compression codec for all data generated by this producer.")
+ .required(true)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .allowableValues("none", "gzip", "snappy", "lz4")
+ .defaultValue("none")
+ .build();
+
+ static final PropertyDescriptor ATTRIBUTE_NAME_REGEX = new PropertyDescriptor.Builder()
+ .name("attribute-name-regex")
+ .displayName("Attributes to Send as Headers (Regex)")
+ .description("A Regular Expression that is matched against all FlowFile attribute names. "
+ + "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
+ + "If not specified, no FlowFile attributes will be added as headers.")
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .expressionLanguageSupported(false)
+ .required(false)
+ .build();
+ static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder()
+ .name("use-transactions")
+ .displayName("Use Transactions")
+ .description("Specifies whether or not NiFi should provide Transactional guarantees when communicating with Kafka. If there is a problem sending data to Kafka, "
+ + "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. "
+ + "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true "
+ + "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"")
+ .expressionLanguageSupported(false)
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+ static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder()
+ .name("message-header-encoding")
+ .displayName("Message Header Encoding")
+ .description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, "
+ + "this property indicates the Character Encoding to use for serializing the headers.")
+ .addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
+ .defaultValue("UTF-8")
+ .required(false)
+ .build();
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles for which all content was sent to Kafka.")
+ .build();
+
+ static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
+ .build();
+
+ private static final List<PropertyDescriptor> PROPERTIES;
+ private static final Set<Relationship> RELATIONSHIPS;
+
+ private volatile PublisherPool publisherPool = null;
+
+ static {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
+ properties.add(TOPIC);
+ properties.add(DELIVERY_GUARANTEE);
+ properties.add(USE_TRANSACTIONS);
+ properties.add(ATTRIBUTE_NAME_REGEX);
+ properties.add(MESSAGE_HEADER_ENCODING);
+ properties.add(KEY);
+ properties.add(KEY_ATTRIBUTE_ENCODING);
+ properties.add(MESSAGE_DEMARCATOR);
+ properties.add(MAX_REQUEST_SIZE);
+ properties.add(ACK_WAIT_TIME);
+ properties.add(METADATA_WAIT_TIME);
+ properties.add(PARTITION_CLASS);
+ properties.add(COMPRESSION_CODEC);
+
+ PROPERTIES = Collections.unmodifiableList(properties);
+
+ final Set<Relationship> relationships = new HashSet<>();
+ relationships.add(REL_SUCCESS);
+ relationships.add(REL_FAILURE);
+ RELATIONSHIPS = Collections.unmodifiableSet(relationships);
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
+ .name(propertyDescriptorName)
+ .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+ results.addAll(KafkaProcessorUtils.validateCommonProperties(validationContext));
+
+ final boolean useTransactions = validationContext.getProperty(USE_TRANSACTIONS).asBoolean();
+ if (useTransactions) {
+ final String deliveryGuarantee = validationContext.getProperty(DELIVERY_GUARANTEE).getValue();
+ if (!DELIVERY_REPLICATED.getValue().equals(deliveryGuarantee)) {
+ results.add(new ValidationResult.Builder()
+ .subject("Delivery Guarantee")
+ .valid(false)
+ .explanation("In order to use Transactions, the Delivery Guarantee must be \"Guarantee Replicated Delivery.\" "
+ + "Either change the <Use Transactions> property or the <Delivery Guarantee> property.")
+ .build());
+ }
+ }
+
+ return results;
+ }
+
+ private synchronized PublisherPool getPublisherPool(final ProcessContext context) {
+ PublisherPool pool = publisherPool;
+ if (pool != null) {
+ return pool;
+ }
+
+ return publisherPool = createPublisherPool(context);
+ }
+
+ protected PublisherPool createPublisherPool(final ProcessContext context) {
+ final int maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
+ final long maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS).longValue();
+
+ final String attributeNameRegex = context.getProperty(ATTRIBUTE_NAME_REGEX).getValue();
+ final Pattern attributeNamePattern = attributeNameRegex == null ? null : Pattern.compile(attributeNameRegex);
+ final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+
+ final String charsetName = context.getProperty(MESSAGE_HEADER_ENCODING).evaluateAttributeExpressions().getValue();
+ final Charset charset = Charset.forName(charsetName);
+
+ final Map<String, Object> kafkaProperties = new HashMap<>();
+ KafkaProcessorUtils.buildCommonKafkaProperties(context, ProducerConfig.class, kafkaProperties);
+ kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
+ kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
+
+ return new PublisherPool(kafkaProperties, getLogger(), maxMessageSize, maxAckWaitMillis, useTransactions, attributeNamePattern, charset);
+ }
+
+ @OnStopped
+ public void closePool() {
+ if (publisherPool != null) {
+ publisherPool.close();
+ }
+
+ publisherPool = null;
+ }
+
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+ final boolean useDemarcator = context.getProperty(MESSAGE_DEMARCATOR).isSet();
+
+ final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 500));
+ if (flowFiles.isEmpty()) {
+ return;
+ }
+
+ final PublisherPool pool = getPublisherPool(context);
+ if (pool == null) {
+ context.yield();
+ return;
+ }
+
+ final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
+ final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
+ final boolean useTransactions = context.getProperty(USE_TRANSACTIONS).asBoolean();
+
+ final long startTime = System.nanoTime();
+ try (final PublisherLease lease = pool.obtainPublisher()) {
+ if (useTransactions) {
+ lease.beginTransaction();
+ }
+
+ // Send each FlowFile to Kafka asynchronously.
+ for (final FlowFile flowFile : flowFiles) {
+ if (!isScheduled()) {
+ // If stopped, re-queue FlowFile instead of sending it
+ if (useTransactions) {
+ session.rollback();
+ lease.rollback();
+ return;
+ }
+
+ session.transfer(flowFile);
+ continue;
+ }
+
+ final byte[] messageKey = getMessageKey(flowFile, context);
+ final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+ final byte[] demarcatorBytes;
+ if (useDemarcator) {
+ demarcatorBytes = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions(flowFile).getValue().getBytes(StandardCharsets.UTF_8);
+ } else {
+ demarcatorBytes = null;
+ }
+
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream rawIn) throws IOException {
+ try (final InputStream in = new BufferedInputStream(rawIn)) {
+ lease.publish(flowFile, in, messageKey, demarcatorBytes, topic);
+ }
+ }
+ });
+ }
+
+ // Complete the send
+ final PublishResult publishResult = lease.complete();
+
+ if (publishResult.isFailure()) {
+ getLogger().info("Failed to send FlowFile to kafka; transferring to failure");
+ session.transfer(flowFiles, REL_FAILURE);
+ return;
+ }
+
+ // Transfer any successful FlowFiles.
+ final long transmissionMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
+ for (FlowFile success : flowFiles) {
+ final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(success).getValue();
+
+ final int msgCount = publishResult.getSuccessfulMessageCount(success);
+ success = session.putAttribute(success, MSG_COUNT, String.valueOf(msgCount));
+ session.adjustCounter("Messages Sent", msgCount, true);
+
+ final String transitUri = KafkaProcessorUtils.buildTransitURI(securityProtocol, bootstrapServers, topic);
+ session.getProvenanceReporter().send(success, transitUri, "Sent " + msgCount + " messages", transmissionMillis);
+ session.transfer(success, REL_SUCCESS);
+ }
+ }
+ }
+
+
+ private byte[] getMessageKey(final FlowFile flowFile, final ProcessContext context) {
+ if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
+ return null;
+ }
+
+ final String uninterpretedKey;
+ if (context.getProperty(KEY).isSet()) {
+ uninterpretedKey = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+ } else {
+ uninterpretedKey = flowFile.getAttribute(KafkaProcessorUtils.KAFKA_KEY);
+ }
+
+ if (uninterpretedKey == null) {
+ return null;
+ }
+
+ final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
+ if (UTF8_ENCODING.getValue().equals(keyEncoding)) {
+ return uninterpretedKey.getBytes(StandardCharsets.UTF_8);
+ }
+
+ return DatatypeConverter.parseHexBinary(uninterpretedKey);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java
new file mode 100644
index 0000000..1f7c3ab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishResult.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import org.apache.nifi.flowfile.FlowFile;
+
+public interface PublishResult {
+
+ boolean isFailure();
+
+ int getSuccessfulMessageCount(FlowFile flowFile);
+
+ Exception getReasonForFailure(FlowFile flowFile);
+
+ public static PublishResult EMPTY = new PublishResult() {
+ @Override
+ public boolean isFailure() {
+ return false;
+ }
+
+ @Override
+ public int getSuccessfulMessageCount(FlowFile flowFile) {
+ return 0;
+ }
+
+ @Override
+ public Exception getReasonForFailure(FlowFile flowFile) {
+ return null;
+ }
+ };
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
new file mode 100644
index 0000000..abcd15f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.header.Headers;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.exception.TokenTooLargeException;
+import org.apache.nifi.stream.io.util.StreamDemarcator;
+
+public class PublisherLease implements Closeable {
+ private final ComponentLog logger;
+ private final Producer<byte[], byte[]> producer;
+ private final int maxMessageSize;
+ private final long maxAckWaitMillis;
+ private final boolean useTransactions;
+ private final Pattern attributeNameRegex;
+ private final Charset headerCharacterSet;
+ private volatile boolean poisoned = false;
+ private final AtomicLong messagesSent = new AtomicLong(0L);
+
+ private volatile boolean transactionsInitialized = false;
+ private volatile boolean activeTransaction = false;
+
+ private InFlightMessageTracker tracker;
+
+ public PublisherLease(final Producer<byte[], byte[]> producer, final int maxMessageSize, final long maxAckWaitMillis, final ComponentLog logger,
+ final boolean useTransactions, final Pattern attributeNameRegex, final Charset headerCharacterSet) {
+ this.producer = producer;
+ this.maxMessageSize = maxMessageSize;
+ this.logger = logger;
+ this.maxAckWaitMillis = maxAckWaitMillis;
+ this.useTransactions = useTransactions;
+ this.attributeNameRegex = attributeNameRegex;
+ this.headerCharacterSet = headerCharacterSet;
+ }
+
+ protected void poison() {
+ this.poisoned = true;
+ }
+
+ public boolean isPoisoned() {
+ return poisoned;
+ }
+
+ void beginTransaction() {
+ if (!useTransactions) {
+ return;
+ }
+
+ if (!transactionsInitialized) {
+ producer.initTransactions();
+ transactionsInitialized = true;
+ }
+
+ producer.beginTransaction();
+ activeTransaction = true;
+ }
+
+ void rollback() {
+ if (!useTransactions || !activeTransaction) {
+ return;
+ }
+
+ producer.abortTransaction();
+ activeTransaction = false;
+ }
+
+ void fail(final FlowFile flowFile, final Exception cause) {
+ getTracker().fail(flowFile, cause);
+ rollback();
+ }
+
+ void publish(final FlowFile flowFile, final InputStream flowFileContent, final byte[] messageKey, final byte[] demarcatorBytes, final String topic) throws IOException {
+ if (tracker == null) {
+ tracker = new InFlightMessageTracker(logger);
+ }
+
+ try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
+ byte[] messageContent;
+ try {
+ while ((messageContent = demarcator.nextToken()) != null) {
+ // We do not want to use any key if we have a demarcator because that would result in
+ // the key being the same for multiple messages
+ final byte[] keyToUse = demarcatorBytes == null ? messageKey : null;
+ publish(flowFile, keyToUse, messageContent, topic, tracker);
+
+ if (tracker.isFailed(flowFile)) {
+ // If we have a failure, don't try to send anything else.
+ return;
+ }
+ }
+ } catch (final TokenTooLargeException ttle) {
+ tracker.fail(flowFile, ttle);
+ }
+ } catch (final Exception e) {
+ tracker.fail(flowFile, e);
+ poison();
+ throw e;
+ }
+ }
+
+ void publish(final FlowFile flowFile, final RecordSet recordSet, final RecordSetWriterFactory writerFactory, final RecordSchema schema,
+ final String messageKeyField, final String topic) throws IOException {
+ if (tracker == null) {
+ tracker = new InFlightMessageTracker(logger);
+ }
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream(1024);
+
+ Record record;
+ int recordCount = 0;
+
+ try (final RecordSetWriter writer = writerFactory.createWriter(logger, schema, baos)) {
+ while ((record = recordSet.next()) != null) {
+ recordCount++;
+ baos.reset();
+
+ writer.write(record);
+ writer.flush();
+
+ final byte[] messageContent = baos.toByteArray();
+ final String key = messageKeyField == null ? null : record.getAsString(messageKeyField);
+ final byte[] messageKey = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
+
+ publish(flowFile, messageKey, messageContent, topic, tracker);
+
+ if (tracker.isFailed(flowFile)) {
+ // If we have a failure, don't try to send anything else.
+ return;
+ }
+ }
+
+ if (recordCount == 0) {
+ tracker.trackEmpty(flowFile);
+ }
+ } catch (final TokenTooLargeException ttle) {
+ tracker.fail(flowFile, ttle);
+ } catch (final SchemaNotFoundException snfe) {
+ throw new IOException(snfe);
+ } catch (final Exception e) {
+ tracker.fail(flowFile, e);
+ poison();
+ throw e;
+ }
+ }
+
+ private void addHeaders(final FlowFile flowFile, final ProducerRecord<?, ?> record) {
+ if (attributeNameRegex == null) {
+ return;
+ }
+
+ final Headers headers = record.headers();
+ for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
+ if (attributeNameRegex.matcher(entry.getKey()).matches()) {
+ headers.add(entry.getKey(), entry.getValue().getBytes(headerCharacterSet));
+ }
+ }
+ }
+
+ protected void publish(final FlowFile flowFile, final byte[] messageKey, final byte[] messageContent, final String topic, final InFlightMessageTracker tracker) {
+ final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, messageKey, messageContent);
+ addHeaders(flowFile, record);
+
+ producer.send(record, new Callback() {
+ @Override
+ public void onCompletion(final RecordMetadata metadata, final Exception exception) {
+ if (exception == null) {
+ tracker.incrementAcknowledgedCount(flowFile);
+ } else {
+ tracker.fail(flowFile, exception);
+ poison();
+ }
+ }
+ });
+
+ messagesSent.incrementAndGet();
+ tracker.incrementSentCount(flowFile);
+ }
+
+
+ public PublishResult complete() {
+ if (tracker == null) {
+ if (messagesSent.get() == 0L) {
+ return PublishResult.EMPTY;
+ }
+
+ rollback();
+ throw new IllegalStateException("Cannot complete publishing to Kafka because Publisher Lease was already closed");
+ }
+
+ producer.flush();
+
+ if (activeTransaction) {
+ producer.commitTransaction();
+ activeTransaction = false;
+ }
+
+ try {
+ tracker.awaitCompletion(maxAckWaitMillis);
+ return tracker.createPublishResult();
+ } catch (final InterruptedException e) {
+ logger.warn("Interrupted while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
+ Thread.currentThread().interrupt();
+ return tracker.failOutstanding(e);
+ } catch (final TimeoutException e) {
+ logger.warn("Timed out while waiting for an acknowledgement from Kafka; some FlowFiles may be transferred to 'failure' even though they were received by Kafka");
+ return tracker.failOutstanding(e);
+ } finally {
+ tracker = null;
+ }
+ }
+
+ @Override
+ public void close() {
+ producer.close(maxAckWaitMillis, TimeUnit.MILLISECONDS);
+ tracker = null;
+ }
+
+ public InFlightMessageTracker getTracker() {
+ if (tracker == null) {
+ tracker = new InFlightMessageTracker(logger);
+ }
+
+ return tracker;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
new file mode 100644
index 0000000..d5caa8d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherPool.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.kafka.pubsub;
+
+import java.io.Closeable;
+import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.nifi.logging.ComponentLog;
+
+public class PublisherPool implements Closeable {
+ private final ComponentLog logger;
+ private final BlockingQueue<PublisherLease> publisherQueue;
+ private final Map<String, Object> kafkaProperties;
+ private final int maxMessageSize;
+ private final long maxAckWaitMillis;
+ private final boolean useTransactions;
+ private final Pattern attributeNameRegex;
+ private final Charset headerCharacterSet;
+
+ private volatile boolean closed = false;
+
+ PublisherPool(final Map<String, Object> kafkaProperties, final ComponentLog logger, final int maxMessageSize, final long maxAckWaitMillis,
+ final boolean useTransactions, final Pattern attributeNameRegex, final Charset headerCharacterSet) {
+ this.logger = logger;
+ this.publisherQueue = new LinkedBlockingQueue<>();
+ this.kafkaProperties = kafkaProperties;
+ this.maxMessageSize = maxMessageSize;
+ this.maxAckWaitMillis = maxAckWaitMillis;
+ this.useTransactions = useTransactions;
+ this.attributeNameRegex = attributeNameRegex;
+ this.headerCharacterSet = headerCharacterSet;
+ }
+
+ public PublisherLease obtainPublisher() {
+ if (isClosed()) {
+ throw new IllegalStateException("Connection Pool is closed");
+ }
+
+ PublisherLease lease = publisherQueue.poll();
+ if (lease != null) {
+ return lease;
+ }
+
+ lease = createLease();
+ return lease;
+ }
+
+ private PublisherLease createLease() {
+ final Map<String, Object> properties = new HashMap<>(kafkaProperties);
+ if (useTransactions) {
+ properties.put("transactional.id", UUID.randomUUID().toString());
+ }
+
+ final Producer<byte[], byte[]> producer = new KafkaProducer<>(properties);
+
+ final PublisherLease lease = new PublisherLease(producer, maxMessageSize, maxAckWaitMillis, logger, useTransactions, attributeNameRegex, headerCharacterSet) {
+ @Override
+ public void close() {
+ if (isPoisoned() || isClosed()) {
+ super.close();
+ } else {
+ publisherQueue.offer(this);
+ }
+ }
+ };
+
+ return lease;
+ }
+
+ public synchronized boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public synchronized void close() {
+ closed = true;
+
+ PublisherLease lease;
+ while ((lease = publisherQueue.poll()) != null) {
+ lease.close();
+ }
+ }
+
+ /**
+ * Returns the number of leases that are currently available
+ *
+ * @return the number of leases currently available
+ */
+ protected int available() {
+ return publisherQueue.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..6ffd0d2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.kafka.pubsub.PublishKafka_0_11
+org.apache.nifi.processors.kafka.pubsub.PublishKafkaRecord_0_11
+org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_11
+org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_11
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10/additionalDetails.html
new file mode 100644
index 0000000..018a289
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafkaRecord_0_10/additionalDetails.html
@@ -0,0 +1,143 @@
+<!DOCTYPE html>
+<html lang="en">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <head>
+ <meta charset="utf-8" />
+ <title>ConsumeKafka</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+ </head>
+
+ <body>
+ <h2>Description</h2>
+ <p>
+ This Processor polls <a href="http://kafka.apache.org/">Apache Kafka</a>
+ for data using KafkaConsumer API available with Kafka 0.11.x. When a message is received
+ from Kafka, the message will be deserialized using the configured Record Reader, and then
+ written to a FlowFile by serializing the message with the configured Record Writer.
+ </p>
+
+
+ <h2>Security Configuration:</h2>
+ <p>
+ The Security Protocol property allows the user to specify the protocol for communicating
+ with the Kafka broker. The following sections describe each of the protocols in further detail.
+ </p>
+ <h3>PLAINTEXT</h3>
+ <p>
+ This option provides an unsecured connection to the broker, with no client authentication and no encryption.
+ In order to use this option the broker must be configured with a listener of the form:
+ <pre>
+ PLAINTEXT://host.name:port
+ </pre>
+ </p>
+ <h3>SSL</h3>
+ <p>
+ This option provides an encrypted connection to the broker, with optional client authentication. In order
+ to use this option the broker must be configured with a listener of the form:
+ <pre>
+ SSL://host.name:port
+ </pre>
+ In addition, the processor must have an SSL Context Service selected.
+ </p>
+ <p>
+ If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will
+ not be required to present a certificate. In this case, the SSL Context Service selected may specify only
+ a truststore containing the public key of the certificate authority used to sign the broker's key.
+ </p>
+ <p>
+ If the broker specifies ssl.client.auth=required then the client will be required to present a certificate.
+ In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to
+ a truststore as described above.
+ </p>
+ <h3>SASL_PLAINTEXT</h3>
+ <p>
+ This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this
+ option the broker must be configured with a listener of the form:
+ <pre>
+ SASL_PLAINTEXT://host.name:port
+ </pre>
+ In addition, the Kerberos Service Name must be specified in the processor.
+ </p>
+ <h4>SASL_PLAINTEXT - GSSAPI</h4>
+ <p>
+ If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The
+ JAAS configuration can be provided by specifying the java.security.auth.login.config system property in
+ NiFi's bootstrap.conf, such as:
+ <pre>
+ java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf
+ </pre>
+ </p>
+ <p>
+ An example of the JAAS config file would be the following:
+ <pre>
+ KafkaClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ storeKey=true
+ keyTab="/path/to/nifi.keytab"
+ serviceName="kafka"
+ principal="nifi@YOURREALM.COM";
+ };
+ </pre>
+ <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor.
+ </p>
+ <p>
+ Alternatively, the JAAS
+ configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab
+ directly in the processor properties. This will dynamically create a JAAS configuration like above, and
+ will take precedence over the java.security.auth.login.config system property.
+ </p>
+ <h4>SASL_PLAINTEXT - PLAIN</h4>
+ <p>
+ If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but
+ the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would
+ be the following:
+ <pre>
+ KafkaClient {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="nifi"
+ password="nifi-password";
+ };
+ </pre>
+ </p>
+ <p>
+ <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit
+ the username and password unencrypted.
+ </p>
+ <p>
+ <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making
+ it visible to components in other NARs that may access the providers. There is currently a known issue
+ where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work.
+ </p>
+ <h3>SASL_SSL</h3>
+ <p>
+ This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this
+ option the broker must be configured with a listener of the form:
+ <pre>
+ SASL_SSL://host.name:port
+ </pre>
+ </p>
+ <p>
+ See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration
+ depending on the SASL mechanism (GSSAPI or PLAIN).
+ </p>
+ <p>
+ See the SSL section for a description of how to configure the SSL Context Service based on the
+ ssl.client.auth property.
+ </p>
+
+ </body>
+</html>
http://git-wip-us.apache.org/repos/asf/nifi/blob/3fb704c5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10/additionalDetails.html b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10/additionalDetails.html
new file mode 100644
index 0000000..760066d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_0_10/additionalDetails.html
@@ -0,0 +1,143 @@
+<!DOCTYPE html>
+<html lang="en">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <head>
+ <meta charset="utf-8" />
+ <title>ConsumeKafka</title>
+ <link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
+ </head>
+
+ <body>
+ <h2>Description</h2>
+ <p>
+ This Processor polls <a href="http://kafka.apache.org/">Apache Kafka</a>
+ for data using KafkaConsumer API available with Kafka 0.11.x. When a message is received
+ from Kafka, this Processor emits a FlowFile where the content of the FlowFile is the value
+ of the Kafka message.
+ </p>
+
+
+ <h2>Security Configuration</h2>
+ <p>
+ The Security Protocol property allows the user to specify the protocol for communicating
+ with the Kafka broker. The following sections describe each of the protocols in further detail.
+ </p>
+ <h3>PLAINTEXT</h3>
+ <p>
+ This option provides an unsecured connection to the broker, with no client authentication and no encryption.
+ In order to use this option the broker must be configured with a listener of the form:
+ <pre>
+ PLAINTEXT://host.name:port
+ </pre>
+ </p>
+ <h3>SSL</h3>
+ <p>
+ This option provides an encrypted connection to the broker, with optional client authentication. In order
+ to use this option the broker must be configured with a listener of the form:
+ <pre>
+ SSL://host.name:port
+ </pre>
+ In addition, the processor must have an SSL Context Service selected.
+ </p>
+ <p>
+ If the broker specifies ssl.client.auth=none, or does not specify ssl.client.auth, then the client will
+ not be required to present a certificate. In this case, the SSL Context Service selected may specify only
+ a truststore containing the public key of the certificate authority used to sign the broker's key.
+ </p>
+ <p>
+ If the broker specifies ssl.client.auth=required then the client will be required to present a certificate.
+ In this case, the SSL Context Service must also specify a keystore containing a client key, in addition to
+ a truststore as described above.
+ </p>
+ <h3>SASL_PLAINTEXT</h3>
+ <p>
+ This option uses SASL with a PLAINTEXT transport layer to authenticate to the broker. In order to use this
+ option the broker must be configured with a listener of the form:
+ <pre>
+ SASL_PLAINTEXT://host.name:port
+ </pre>
+ In addition, the Kerberos Service Name must be specified in the processor.
+ </p>
+ <h4>SASL_PLAINTEXT - GSSAPI</h4>
+ <p>
+ If the SASL mechanism is GSSAPI, then the client must provide a JAAS configuration to authenticate. The
+ JAAS configuration can be provided by specifying the java.security.auth.login.config system property in
+ NiFi's bootstrap.conf, such as:
+ <pre>
+ java.arg.16=-Djava.security.auth.login.config=/path/to/kafka_client_jaas.conf
+ </pre>
+ </p>
+ <p>
+ An example of the JAAS config file would be the following:
+ <pre>
+ KafkaClient {
+ com.sun.security.auth.module.Krb5LoginModule required
+ useKeyTab=true
+ storeKey=true
+ keyTab="/path/to/nifi.keytab"
+ serviceName="kafka"
+ principal="nifi@YOURREALM.COM";
+ };
+ </pre>
+ <b>NOTE:</b> The serviceName in the JAAS file must match the Kerberos Service Name in the processor.
+ </p>
+ <p>
+ Alternatively, the JAAS
+ configuration when using GSSAPI can be provided by specifying the Kerberos Principal and Kerberos Keytab
+ directly in the processor properties. This will dynamically create a JAAS configuration like above, and
+ will take precedence over the java.security.auth.login.config system property.
+ </p>
+ <h4>SASL_PLAINTEXT - PLAIN</h4>
+ <p>
+ If the SASL mechanism is PLAIN, then client must provide a JAAS configuration to authenticate, but
+ the JAAS configuration must use Kafka's PlainLoginModule. An example of the JAAS config file would
+ be the following:
+ <pre>
+ KafkaClient {
+ org.apache.kafka.common.security.plain.PlainLoginModule required
+ username="nifi"
+ password="nifi-password";
+ };
+ </pre>
+ </p>
+ <p>
+ <b>NOTE:</b> It is not recommended to use a SASL mechanism of PLAIN with SASL_PLAINTEXT, as it would transmit
+ the username and password unencrypted.
+ </p>
+ <p>
+ <b>NOTE:</b> Using the PlainLoginModule will cause it be registered in the JVM's static list of Providers, making
+ it visible to components in other NARs that may access the providers. There is currently a known issue
+ where Kafka processors using the PlainLoginModule will cause HDFS processors with Keberos to no longer work.
+ </p>
+ <h3>SASL_SSL</h3>
+ <p>
+ This option uses SASL with an SSL/TLS transport layer to authenticate to the broker. In order to use this
+ option the broker must be configured with a listener of the form:
+ <pre>
+ SASL_SSL://host.name:port
+ </pre>
+ </p>
+ <p>
+ See the SASL_PLAINTEXT section for a description of how to provide the proper JAAS configuration
+ depending on the SASL mechanism (GSSAPI or PLAIN).
+ </p>
+ <p>
+ See the SSL section for a description of how to configure the SSL Context Service based on the
+ ssl.client.auth property.
+ </p>
+
+ </body>
+</html>