You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/02 16:02:38 UTC

incubator-nifi git commit: NIFI-220: Initial Import of GetKafka and PutKafka processors

Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-220 [created] 3e2f79067


NIFI-220: Initial Import of GetKafka and PutKafka processors


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3e2f7906
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3e2f7906
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3e2f7906

Branch: refs/heads/NIFI-220
Commit: 3e2f79067251adfd2846a3aea42c052e7cae8e92
Parents: 68b7ad7
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jan 1 20:52:36 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jan 1 20:52:36 2015 -0500

----------------------------------------------------------------------
 assembly/pom.xml                                |   5 +
 nar-bundles/kafka-bundle/kafka-nar/pom.xml      |  37 +++
 .../kafka-bundle/kafka-processors/pom.xml       |  76 ++++++
 .../apache/nifi/processors/kafka/GetKafka.java  | 259 +++++++++++++++++++
 .../apache/nifi/processors/kafka/PutKafka.java  | 252 ++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   2 +
 .../index.html                                  | 147 +++++++++++
 .../index.html                                  | 177 +++++++++++++
 .../nifi/processors/kafka/TestGetKafka.java     |  59 +++++
 .../nifi/processors/kafka/TestPutKafka.java     |  48 ++++
 nar-bundles/kafka-bundle/pom.xml                |  35 +++
 pom.xml                                         |   6 +
 12 files changed, 1103 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index ae74485..0d00da3 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -203,6 +203,11 @@
             <artifactId>hadoop-nar</artifactId>
             <type>nar</type>
         </dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>kafka-nar</artifactId>
+			<type>nar</type>
+		</dependency>
     </dependencies>
     
     <properties>        

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-nar/pom.xml b/nar-bundles/kafka-bundle/kafka-nar/pom.xml
new file mode 100644
index 0000000..8dfefdd
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-nar/pom.xml
@@ -0,0 +1,37 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>kafka-bundle</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>kafka-nar</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>NiFi Kafka NAR</name>
+    <packaging>nar</packaging>
+    <description>NiFi NAR for interacting with Apache Kafka</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>kafka-processors</artifactId>
+			<version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/pom.xml b/nar-bundles/kafka-bundle/kafka-processors/pom.xml
new file mode 100644
index 0000000..8cad323
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/pom.xml
@@ -0,0 +1,76 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>kafka-bundle</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+	<modelVersion>4.0.0</modelVersion>
+	
+  <artifactId>kafka-processors</artifactId>
+  <packaging>jar</packaging>
+
+  <name>kafka-processors</name>
+  
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.nifi</groupId>
+      <artifactId>nifi-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.nifi</groupId>
+      <artifactId>nifi-processor-utils</artifactId>
+    </dependency>
+    <dependency>
+    	<groupId>org.apache.nifi</groupId>
+    	<artifactId>nifi-utils</artifactId>
+    </dependency>
+	<dependency>
+	    <groupId>org.apache.kafka</groupId>
+	    <artifactId>kafka_2.8.2</artifactId>
+	    <version>0.8.1</version>
+	    <exclusions>
+	    	<!-- Transitive dependencies excluded because they are located 
+	    		 in a legacy Maven repository, which Maven 3 doesn't support. -->
+	    	<exclusion>
+	    		<groupId>javax.jms</groupId>
+	    		<artifactId>jms</artifactId>
+	    	</exclusion>
+	    	<exclusion>
+	    		<groupId>com.sun.jdmk</groupId>
+	    		<artifactId>jmxtools</artifactId>
+	    	</exclusion>
+	    	<exclusion>
+	    		<groupId>com.sun.jmx</groupId>
+	    		<artifactId>jmxri</artifactId>
+	    	</exclusion>
+	    </exclusions>
+	</dependency>
+	
+	<dependency>
+		<groupId>org.apache.nifi</groupId>
+		<artifactId>nifi-mock</artifactId>
+		<scope>test</scope>
+	</dependency>
+	<dependency>
+		<groupId>org.slf4j</groupId>
+		<artifactId>slf4j-simple</artifactId>
+		<scope>test</scope>
+	</dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
new file mode 100644
index 0000000..55c67e3
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -0,0 +1,259 @@
+package org.apache.nifi.processors.kafka;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.annotation.CapabilityDescription;
+import org.apache.nifi.processor.annotation.OnScheduled;
+import org.apache.nifi.processor.annotation.OnStopped;
+import org.apache.nifi.processor.annotation.OnUnscheduled;
+import org.apache.nifi.processor.annotation.SupportsBatching;
+import org.apache.nifi.processor.annotation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@SupportsBatching
+@CapabilityDescription("Fetches messages from Apache Kafka")
+@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
+public class GetKafka extends AbstractProcessor {
+    public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder()
+        .name("ZooKeeper Connection String")
+        .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port> combinations. For example, host1:2181,host2:2181,host3:2188")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+    public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+        .name("Topic Name")
+        .description("The Kafka Topic to pull messages from")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+    public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder()
+		.name("Zookeeper Commit Frequency")
+		.description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will result in better overall performance but can result in more data duplication if a NiFi node is lost")
+		.required(true)
+		.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+		.expressionLanguageSupported(false)
+		.defaultValue("60 secs")
+		.build();
+    public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder()
+	    .name("ZooKeeper Communications Timeout")
+	    .description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error")
+	    .required(true)
+	    .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+	    .expressionLanguageSupported(false)
+	    .defaultValue("30 secs")
+	    .build();
+    public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder()
+	    .name("Kafka Communications Timeout")
+	    .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
+	    .required(true)
+	    .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+	    .expressionLanguageSupported(false)
+	    .defaultValue("30 secs")
+	    .build();
+    public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
+        .name("Client Name")
+        .description("Client Name to use when communicating with Kafka")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+	    .name("success")
+	    .description("All FlowFiles that are created are routed to this relationship")
+	    .build();
+
+    
+    private final BlockingQueue<ConsumerIterator<byte[], byte[]>> streamIterators = new LinkedBlockingQueue<>();
+    private volatile ConsumerConnector consumer;
+
+    final Lock interruptionLock = new ReentrantLock();
+    // guarded by interruptionLock
+    private final Set<Thread> interruptableThreads = new HashSet<>();
+    
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    	final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder()
+    		.fromPropertyDescriptor(CLIENT_NAME)
+    		.defaultValue("NiFi-" + getIdentifier())
+    		.build();
+    	
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(ZOOKEEPER_CONNECTION_STRING);
+        props.add(TOPIC);
+        props.add(ZOOKEEPER_COMMIT_DELAY);
+        props.add(clientNameWithDefault);
+        props.add(KAFKA_TIMEOUT);
+        props.add(ZOOKEEPER_TIMEOUT);
+        return props;
+    }
+    
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>(1);
+        relationships.add(REL_SUCCESS);
+        return relationships;
+    }
+    
+    @OnScheduled
+    public void createConsumers(final ProcessContext context) {
+    	final String topic = context.getProperty(TOPIC).getValue();
+    	
+    	final Map<String, Integer> topicCountMap = new HashMap<>(1);
+    	topicCountMap.put(topic, context.getMaxConcurrentTasks());
+    	
+    	final Properties props = new Properties();
+    	props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); 
+    	props.setProperty("group.id", getIdentifier());
+    	props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
+    	props.setProperty("auto.commit.enable", "true"); // just be explicit
+    	props.setProperty("auto.offset.reset", "smallest");
+    	
+    	final ConsumerConfig consumerConfig = new ConsumerConfig(props);
+    	consumer = Consumer.createJavaConsumerConnector(consumerConfig);
+    	
+    	final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+    	final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+    	
+    	this.streamIterators.clear();
+    	
+    	for ( final KafkaStream<byte[], byte[]> stream : streams ) {
+    		streamIterators.add(stream.iterator());
+    	}
+    }
+    
+    @OnStopped
+    public void shutdownConsumer() {
+    	if ( consumer != null ) {
+    		try {
+    			consumer.commitOffsets();
+    		} finally {
+    			consumer.shutdown();
+    		}
+    	}
+    }
+    
+    @OnUnscheduled
+    public void interruptIterators() {
+    	// Kafka doesn't provide a non-blocking API for pulling messages. We can, however,
+    	// interrupt the Threads. We do this when the Processor is stopped so that we have the
+    	// ability to shutdown the Processor.
+    	interruptionLock.lock();
+    	try {
+    		for ( final Thread t : interruptableThreads ) {
+    			t.interrupt();
+    		}
+    		
+    		interruptableThreads.clear();
+    	} finally {
+    		interruptionLock.unlock();
+    	}
+    }
+    
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+    	ConsumerIterator<byte[], byte[]> iterator = streamIterators.poll();
+    	if ( iterator == null ) {
+    		return;
+    	}
+    	
+    	FlowFile flowFile = null;
+    	try {
+    		interruptionLock.lock();
+    		try {
+    			interruptableThreads.add(Thread.currentThread());
+    		} finally {
+    			interruptionLock.unlock();
+    		}
+    		
+    		try {
+	    		if (!iterator.hasNext() ) {
+	    			return;
+	    		}
+    		} catch (final Exception e) {
+    			getLogger().warn("Failed to invoke hasNext() due to ", new Object[] {e});
+    			iterator = null;
+    			return;
+    		}
+    		
+    		final long start = System.nanoTime();
+    		final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
+    		
+    		if ( mam == null ) {
+    			return;
+    		}
+    		
+    		final byte[] key = mam.key();
+    		
+    		final Map<String, String> attributes = new HashMap<>();
+    		if ( key != null ) {
+    			attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
+    		}
+    		attributes.put("kafka.offset", String.valueOf(mam.offset()));
+    		attributes.put("kafka.partition", String.valueOf(mam.partition()));
+    		attributes.put("kafka.topic", mam.topic());
+    		
+    		flowFile = session.create();
+    		flowFile = session.write(flowFile, new OutputStreamCallback() {
+				@Override
+				public void process(final OutputStream out) throws IOException {
+					out.write(mam.message());
+				}
+    		});
+    		
+    		flowFile = session.putAllAttributes(flowFile, attributes);
+    		final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+    		session.getProvenanceReporter().receive(flowFile, "kafka://" + mam.topic() + "/partitions/" + mam.partition() + "/offsets/" + mam.offset(), millis);
+    		getLogger().info("Successfully received {} from Kafka in {} millis", new Object[] {flowFile, millis});
+    		session.transfer(flowFile, REL_SUCCESS);
+    	} catch (final Exception e) {
+    		getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e});
+    		if ( flowFile != null ) {
+    			session.remove(flowFile);
+    		}
+    	} finally {
+    		interruptionLock.lock();
+    		try {
+    			interruptableThreads.remove(Thread.currentThread());
+    		} finally {
+    			interruptionLock.unlock();
+    		}
+    		
+    		if ( iterator != null ) {
+    			streamIterators.offer(iterator);
+    		}
+    	}
+    }
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
new file mode 100644
index 0000000..5e5940c
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.annotation.CapabilityDescription;
+import org.apache.nifi.processor.annotation.OnStopped;
+import org.apache.nifi.processor.annotation.SupportsBatching;
+import org.apache.nifi.processor.annotation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+@SupportsBatching
+@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka")
+public class PutKafka extends AbstractProcessor {
+    private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
+    private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
+    
+    public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
+    public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes");
+    public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result in data loss.");
+    
+    public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder()
+        .name("Known Brokers")
+        .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
+        .required(true)
+        .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
+        .expressionLanguageSupported(false)
+        .build();
+    public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+	    .name("Topic Name")
+	    .description("The Kafka Topic of interest")
+	    .required(true)
+	    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+	    .expressionLanguageSupported(true)
+	    .build();
+    public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+		.name("Kafka Key")
+		.description("The Key to use for the Message")
+		.required(false)
+		.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+		.expressionLanguageSupported(true)
+		.build();
+    public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
+		.name("Delivery Guarantee")
+		.description("Specifies the requirement for guaranteeing that a message is sent to Kafka")
+		.required(true)
+		.expressionLanguageSupported(false)
+		.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
+		.defaultValue(DELIVERY_BEST_EFFORT.getValue())
+		.build();
+    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+	    .name("Communications Timeout")
+	    .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
+	    .required(true)
+	    .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+	    .expressionLanguageSupported(false)
+	    .defaultValue("30 secs")
+	    .build();
+    public static final PropertyDescriptor MAX_FLOWFILE_SIZE = new PropertyDescriptor.Builder()
+		.name("Max FlowFile Size")
+		.description("Specifies the amount of data that can be buffered to send to Kafka. If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'. This helps to prevent the system from running out of memory")
+		.required(true)
+		.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+		.expressionLanguageSupported(false)
+		.defaultValue("1 MB")
+		.build();
+    public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
+	    .name("Client Name")
+	    .description("Client Name to use when communicating with Kafka")
+	    .required(true)
+	    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+	    .expressionLanguageSupported(false)
+	    .build();
+
+    
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+	    .name("success")
+	    .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship")
+	    .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+	    .name("failure")
+	    .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
+	    .build();
+    public static final Relationship REL_REJECT = new Relationship.Builder()
+	    .name("reject")
+	    .description("Any FlowFile whose size exceeds the <Max FlowFile Size> property will be routed to this Relationship")
+	    .build();
+
+    private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>();
+    
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    	final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
+    		.fromPropertyDescriptor(CLIENT_NAME)
+    		.defaultValue("NiFi-" + getIdentifier())
+    		.build();
+    	
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(SEED_BROKERS);
+        props.add(TOPIC);
+        props.add(KEY);
+        props.add(DELIVERY_GUARANTEE);
+        props.add(TIMEOUT);
+        props.add(MAX_FLOWFILE_SIZE);
+        props.add(clientName);
+        return props;
+    }
+    
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>(1);
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_REJECT);
+        return relationships;
+    }
+    
+    
+    @OnStopped
+    public void closeProducers() {
+    	Producer<byte[], byte[]> producer;
+    	
+    	while ((producer = producers.poll()) != null) {
+    		producer.close();
+    	}
+    }
+    
+    
+    private Producer<byte[], byte[]> createProducer(final ProcessContext context) {
+    	final String brokers = context.getProperty(SEED_BROKERS).getValue();
+
+    	final Properties properties = new Properties();
+        properties.setProperty("metadata.broker.list", brokers);
+        properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
+        properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
+        properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
+        
+        properties.setProperty("message.send.max.retries", "1");
+        properties.setProperty("producer.type", "sync");
+        
+        final ProducerConfig config = new ProducerConfig(properties);
+        return new Producer<>(config);
+    }
+    
+    private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) {
+    	Producer<byte[], byte[]> producer = producers.poll();
+    	return producer == null ? createProducer(context) : producer;
+    }
+    
+    private void returnProducer(final Producer<byte[], byte[]> producer) {
+    	producers.offer(producer);
+    }
+    
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+    	FlowFile flowFile = session.get();
+    	if ( flowFile == null ) {
+    		return;
+    	}
+    	
+    	final long start = System.nanoTime();
+    	final long maxSize = context.getProperty(MAX_FLOWFILE_SIZE).asDataSize(DataUnit.B).longValue();
+    	if ( flowFile.getSize() > maxSize ) {
+    		getLogger().info("Routing {} to 'reject' because its size exceeds the configured maximum allowed size", new Object[] {flowFile});
+    		session.getProvenanceReporter().route(flowFile, REL_REJECT, "FlowFile is larger than " + maxSize);
+    		session.transfer(flowFile, REL_REJECT);
+    		return;
+    	}
+    	
+        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        
+        final byte[] value = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, new InputStreamCallback() {
+			@Override
+			public void process(final InputStream in) throws IOException {
+				StreamUtils.fillBuffer(in, value);
+			}
+        });
+        
+        final Producer<byte[], byte[]> producer = borrowProducer(context);
+        boolean error = false;
+        try {
+        	final KeyedMessage<byte[], byte[]> message;
+        	if ( key == null ) {
+        		message = new KeyedMessage<>(topic, value);
+        	} else {
+        		message = new KeyedMessage<>(topic, key.getBytes(StandardCharsets.UTF_8), value);
+        	}
+        	
+        	producer.send(message);
+        	final long nanos = System.nanoTime() - start;
+        	
+        	session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
+        	session.transfer(flowFile, REL_SUCCESS);
+        	getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+        } catch (final Exception e) {
+        	getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e});
+        	session.transfer(flowFile, REL_FAILURE);
+        	error = true;
+        } finally {
+        	if ( error ) {
+        		producer.close();
+        	} else {
+        		returnProducer(producer);
+        	}
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..6ae3da1
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,2 @@
+org.apache.nifi.processors.kafka.GetKafka
+org.apache.nifi.processors.kafka.PutKafka
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
new file mode 100644
index 0000000..d429d6b
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
@@ -0,0 +1,147 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>GetKafka</title>
+        <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+        <!-- Processor Documentation ================================================== -->
+        <h2>Description:</h2>
+        <p>
+        	This Processors polls <a href="http://kafka.apache.org/">Apache Kafka</a>
+        	for data. When a message is received from Kafka, this Processor emits a FlowFile
+        	where the content of the FlowFile is the value of the Kafka message. If the
+        	message has a key associated with it, an attribute named <code>kafka.key</code>
+        	will be added to the FlowFile, with the value being the UTF-8 Encoded value
+        	of the Message's Key.
+        </p>
+        <p>
+        	Kafka supports the notion of a Consumer Group when pulling messages in order to
+        	provide scalability while still offering a publish-subscribe interface. Each
+        	Consumer Group must have a unique identifier. The Consumer Group identifier that
+        	is used by NiFi is the UUID of the Processor. This means that all of the nodes
+        	within a cluster will use the same Consumer Group Identifier so that they do
+        	not receive duplicate data but multiple GetKafka Processors can be used to pull
+        	from multiple Topics, as each Processor will receive a different Processor UUID 
+        	and therefore a different Consumer Group Identifier.
+        </p>
+
+        <p>
+            <strong>Modifies Attributes:</strong>
+        </p>
+        <table border="1">
+            <thead>
+                <tr>
+                    <th>Attribute Name</th>
+                    <th>Description</th>
+                </tr>
+            </thead>
+            <tbody>
+                <tr>
+                    <td>kafka.key</td>
+                    <td>The key of the Kafka message, if it exists. If the message does not have a key,
+                    this attribute will not be added.</td>
+                </tr>
+                <tr>
+                	<td>kafka.topic</td>
+                	<td>The name of the Kafka Topic from which the message was received</td>
+                </tr>
+                <tr>
+                	<td>kafka.partition</td>
+                	<td>The partition of the Kafka Topic from which the message was received</td>
+                </tr>
+                <tr>
+                	<td>kafka.offset</td>
+                	<td>The offset of the message within the Kafka partition</td>
+                </tr>
+            </tbody>
+        </table>
+
+
+        <p>
+            <strong>Properties:</strong>
+        </p>
+        <p>In the list below, the names of required properties appear
+            in bold. Any other properties (not in bold) are considered optional.
+            If a property has a default value, it is indicated. If a property
+            supports the use of the NiFi Expression Language (or simply,
+            "expression language"), that is also indicated.</p>
+        <ul>
+            <li><strong>ZooKeeper Connection String</strong>
+                <ul>
+                    <li>The Connection String to use in order to connect to ZooKeeper. This is often a 
+                    	comma-separated list of &lt;host&gt;:&lt;port&gt; combinations. For example, 
+                    	host1:2181,host2:2181,host3:2188</li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Topic Name</strong>
+                <ul>
+                    <li>The Kafka Topic to pull messages from</li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Zookeeper Commit Frequency</strong>
+                <ul>
+                    <li>Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. 
+                    	A longer time period will result in better overall performance but can result in more data 
+                    	duplication if a NiFi node is lost
+                    </li>
+                    <li>Default value: 60 secs</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>ZooKeeper Communications Timeout</strong>
+                <ul>
+                    <li>The amount of time to wait for a response from ZooKeeper before determining that there is a communications error</li>
+                    <li>Default value: 30 secs</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Kafka Communications Timeout</strong>
+                <ul>
+                    <li>The amount of time to wait for a response from Kafka before determining that there is a communications error</li>
+                    <li>Default value: 30 secs</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Client Name</strong>
+                <ul>
+                    <li>Client Name to use when communicating with Kafka</li>
+                    <li>Default value: "NiFi-" followed by the UUID of the Processor</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            
+        </ul>
+        <p>
+            <strong>Relationships:</strong>
+        </p>
+        <ul>
+            <li>success
+                <ul>
+                    <li>All messages that are received from Kafka are routed to the 'success' relationship</li>
+                </ul>
+            </li>
+        </ul>
+
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
new file mode 100644
index 0000000..38256c5
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
@@ -0,0 +1,177 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>PutKafka</title>
+        <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+        <!-- Processor Documentation ================================================== -->
+        <h2>Description:</h2>
+        <p>
+        	This Processors puts the contents of a FlowFile to a Topic in 
+        	<a href="http://kafka.apache.org/">Apache Kafka</a>. The full contents of
+        	a FlowFile becomes the contents of a single message in Kafka.
+        	This message is optionally assigned a key by using the
+        	&lt;Kafka Key&gt; Property.
+        </p>
+
+
+        <p>
+            <strong>Properties:</strong>
+        </p>
+        <p>In the list below, the names of required properties appear
+            in bold. Any other properties (not in bold) are considered optional.
+            If a property has a default value, it is indicated. If a property
+            supports the use of the NiFi Expression Language (or simply,
+            "expression language"), that is also indicated.</p>
+        <ul>
+            <li><strong>Known Brokers</strong>
+                <ul>
+                    <li>
+                    	A comma-separated list of known Kafka Brokers in the format 
+                    	&lgt;host&gt;:&lt;port&gt;. This list does not need to be
+                    	exhaustive but provides a mechanism for determining which
+                    	other nodes belong to the Kafka cluster.
+                    </li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Topic Name</strong>
+                <ul>
+                    <li>The Kafka Topic to send messages to. While the GetKafka
+                    	Processor requires a statically named Topic so that it knows
+                    	where to fetch messages from, the PutKafka Processor does allow
+                    	the Expression Language to be used so that a single PutKafka
+                    	Processor can be used to send messages to many different Kafka
+                    	topics.
+                    </li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: true</li>
+                </ul>
+            </li>
+            
+            <li>Kafka Key
+                <ul>
+                    <li>
+                    	The Key to use for the Message. If no value is given, the message
+                    	will not be given a Key.
+                    </li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: true</li>
+                </ul>
+            </li>
+            <li><strong>Delivery Guarantee</strong>
+                <ul>
+                    <li>
+                    	Specifies the requirement for guaranteeing that a message is sent to Kafka.
+                    	This Property can have one of three different values:
+                    	<ul>
+                    		<li>
+                    			<b>Guarantee Replicated Delivery</b> - FlowFile will be routed to 
+	                    			failure unless the message is replicated to the appropriate number 
+	                    			of Kafka Nodes according to the Topic configuration
+	                    	</li>
+                    		<li>
+                    			<b>Guarantee Single Node Delivery</b> - FlowFile will be routed to 
+                    				success if the message is received by a single Kafka node, 
+                    				whether or not it is replicated. This is faster than 
+                    				&lt;Guarantee Replicated Delivery&gt; but can result in data loss 
+                    				if a Kafka node crashes
+                    		</li>
+                    		<li>
+                    			<b>Best Effort</b> - FlowFile will be routed to success after successfully 
+                    				writing the content to a Kafka node, without waiting for a response. 
+                    				This provides the best performance but may result in data loss.
+                    		</li>
+                    	</ul>
+                    </li>
+                    <li>Default value: Best Effort</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Communications Timeout</strong>
+                <ul>
+                    <li>
+                    	The amount of time to wait for a response from Kafka before determining 
+                    	that there is a communications error
+                    </li>
+                    <li>Default value: 30 secs</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Max FlowFile Size</strong>
+                <ul>
+                    <li>
+                    	Specifies the amount of data that can be buffered to send to Kafka. Because
+                    	the contents of the FlowFile must be buffered into memory before they can
+                    	be sent to Kafka, attempting to send a very large FlowFile can cause
+                    	problems by causing the machine to run out of memory.
+                    	This helps to prevent the system from running out of memory, the PutKafka
+                    	Processor exposes a property for specifying the maximum size of a FlowFile.
+                    	If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'.
+                    </li>
+                    <li>Default value: 1 MB</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Client Name</strong>
+                <ul>
+                    <li>Client Name to use when communicating with Kafka</li>
+                    <li>Default value: "NiFi-" followed by the UUID of the Processor</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+        </ul>
+        
+        
+        <p>
+            <strong>Relationships:</strong>
+        </p>
+        <ul>
+            <li>success
+                <ul>
+                    <li>All FlowFiles that are successfully sent to Kafka are routed 
+                    	to this relationship.
+                    </li>
+                </ul>
+            </li>
+            
+            <li>reject
+                <ul>
+                    <li>Any FlowFile whose content size exceeds the configured value for 
+                    	the &lt;Max FlowFile Size&gt; property will be routed to this 
+                    	relationship.
+                    </li>
+                </ul>
+            </li>
+            
+            <li>failure
+                <ul>
+                    <li>All FlowFiles that cannot be sent to Kafka for any reason other 
+                    	than their content size exceeding the value of the &lt;Max FlowFile 
+                    	Size&gt; property will be routed to this relationship.
+                    </li>
+                </ul>
+            </li>
+            
+        </ul>
+
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
new file mode 100644
index 0000000..2199a9c
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.util.List;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Intended only for local tests to verify functionality.")
+public class TestGetKafka {
+
+	public static final String ZOOKEEPER_CONNECTION = "192.168.0.101:2181";
+	
+    @BeforeClass
+    public static void configureLogging() {
+    	System.setProperty("org.slf4j.simpleLogger.log.kafka", "INFO");
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.kafka", "INFO");
+        BasicConfigurator.configure();
+    }
+    
+    @Test
+    public void testIntegrationLocally() {
+        final TestRunner runner = TestRunners.newTestRunner(GetKafka.class);
+        runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, ZOOKEEPER_CONNECTION);
+        runner.setProperty(GetKafka.TOPIC, "testX");
+        runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
+        runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
+        
+        runner.run(20, false);
+        
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS);
+        for ( final MockFlowFile flowFile : flowFiles ) {
+        	System.out.println(flowFile.getAttributes());
+        	System.out.println(new String(flowFile.toByteArray()));
+        	System.out.println();
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
new file mode 100644
index 0000000..2e6aacf
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -0,0 +1,48 @@
+package org.apache.nifi.processors.kafka;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+@Ignore("Intended only for local testing to verify functionality.")
+public class TestPutKafka {
+
+	@Test
+	public void testKeyValuePut() {
+		final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
+		runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
+		runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
+		runner.setProperty(PutKafka.KEY, "${kafka.key}");
+		runner.setProperty(PutKafka.TIMEOUT, "3 secs");
+		runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
+		
+		final Map<String, String> attributes = new HashMap<>();
+		attributes.put("kafka.topic", "test");
+		attributes.put("kafka.key", "key3");
+		
+		final byte[] data = "Hello, World, Again! ;)".getBytes();
+		runner.enqueue(data, attributes);
+		runner.enqueue(data, attributes);
+		runner.enqueue(data, attributes);
+		runner.enqueue(data, attributes);
+		
+		runner.run(5);
+		
+		runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
+		final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
+		final MockFlowFile mff = mffs.get(0);
+		
+		assertTrue(Arrays.equals(data, mff.toByteArray()));
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/pom.xml b/nar-bundles/kafka-bundle/pom.xml
new file mode 100644
index 0000000..146db12
--- /dev/null
+++ b/nar-bundles/kafka-bundle/pom.xml
@@ -0,0 +1,35 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nar-bundle-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+	<modelVersion>4.0.0</modelVersion>
+	
+  <artifactId>kafka-bundle</artifactId>
+  <packaging>pom</packaging>
+
+  <name>kafka-bundle</name>
+  
+  <modules>
+	<module>kafka-processors</module>
+	<module>kafka-nar</module>
+  </modules>
+  
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1de1a0e..68e718a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -768,6 +768,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>kafka-nar</artifactId>
+                <version>${project.version}</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-properties</artifactId>
                 <version>${project.version}</version>
             </dependency>