You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/06 16:17:30 UTC

[1/7] incubator-nifi git commit: NIFI-220: Initial Import of GetKafka and PutKafka processors

Repository: incubator-nifi
Updated Branches:
  refs/heads/develop d8b4a781d -> 2d0b16396


NIFI-220: Initial Import of GetKafka and PutKafka processors


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/3e2f7906
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/3e2f7906
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/3e2f7906

Branch: refs/heads/develop
Commit: 3e2f79067251adfd2846a3aea42c052e7cae8e92
Parents: 68b7ad7
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jan 1 20:52:36 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jan 1 20:52:36 2015 -0500

----------------------------------------------------------------------
 assembly/pom.xml                                |   5 +
 nar-bundles/kafka-bundle/kafka-nar/pom.xml      |  37 +++
 .../kafka-bundle/kafka-processors/pom.xml       |  76 ++++++
 .../apache/nifi/processors/kafka/GetKafka.java  | 259 +++++++++++++++++++
 .../apache/nifi/processors/kafka/PutKafka.java  | 252 ++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   2 +
 .../index.html                                  | 147 +++++++++++
 .../index.html                                  | 177 +++++++++++++
 .../nifi/processors/kafka/TestGetKafka.java     |  59 +++++
 .../nifi/processors/kafka/TestPutKafka.java     |  48 ++++
 nar-bundles/kafka-bundle/pom.xml                |  35 +++
 pom.xml                                         |   6 +
 12 files changed, 1103 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/assembly/pom.xml
----------------------------------------------------------------------
diff --git a/assembly/pom.xml b/assembly/pom.xml
index ae74485..0d00da3 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -203,6 +203,11 @@
             <artifactId>hadoop-nar</artifactId>
             <type>nar</type>
         </dependency>
+		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>kafka-nar</artifactId>
+			<type>nar</type>
+		</dependency>
     </dependencies>
     
     <properties>        

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-nar/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-nar/pom.xml b/nar-bundles/kafka-bundle/kafka-nar/pom.xml
new file mode 100644
index 0000000..8dfefdd
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-nar/pom.xml
@@ -0,0 +1,37 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>kafka-bundle</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>kafka-nar</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>NiFi Kafka NAR</name>
+    <packaging>nar</packaging>
+    <description>NiFi NAR for interacting with Apache Kafka</description>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>kafka-processors</artifactId>
+			<version>${project.version}</version>
+        </dependency>
+    </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/pom.xml b/nar-bundles/kafka-bundle/kafka-processors/pom.xml
new file mode 100644
index 0000000..8cad323
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/pom.xml
@@ -0,0 +1,76 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>kafka-bundle</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+	<modelVersion>4.0.0</modelVersion>
+	
+  <artifactId>kafka-processors</artifactId>
+  <packaging>jar</packaging>
+
+  <name>kafka-processors</name>
+  
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.nifi</groupId>
+      <artifactId>nifi-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.nifi</groupId>
+      <artifactId>nifi-processor-utils</artifactId>
+    </dependency>
+    <dependency>
+    	<groupId>org.apache.nifi</groupId>
+    	<artifactId>nifi-utils</artifactId>
+    </dependency>
+	<dependency>
+	    <groupId>org.apache.kafka</groupId>
+	    <artifactId>kafka_2.8.2</artifactId>
+	    <version>0.8.1</version>
+	    <exclusions>
+	    	<!-- Transitive dependencies excluded because they are located 
+	    		 in a legacy Maven repository, which Maven 3 doesn't support. -->
+	    	<exclusion>
+	    		<groupId>javax.jms</groupId>
+	    		<artifactId>jms</artifactId>
+	    	</exclusion>
+	    	<exclusion>
+	    		<groupId>com.sun.jdmk</groupId>
+	    		<artifactId>jmxtools</artifactId>
+	    	</exclusion>
+	    	<exclusion>
+	    		<groupId>com.sun.jmx</groupId>
+	    		<artifactId>jmxri</artifactId>
+	    	</exclusion>
+	    </exclusions>
+	</dependency>
+	
+	<dependency>
+		<groupId>org.apache.nifi</groupId>
+		<artifactId>nifi-mock</artifactId>
+		<scope>test</scope>
+	</dependency>
+	<dependency>
+		<groupId>org.slf4j</groupId>
+		<artifactId>slf4j-simple</artifactId>
+		<scope>test</scope>
+	</dependency>
+  </dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
new file mode 100644
index 0000000..55c67e3
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -0,0 +1,259 @@
+package org.apache.nifi.processors.kafka;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import kafka.consumer.Consumer;
+import kafka.consumer.ConsumerConfig;
+import kafka.consumer.ConsumerIterator;
+import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
+import kafka.message.MessageAndMetadata;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.annotation.CapabilityDescription;
+import org.apache.nifi.processor.annotation.OnScheduled;
+import org.apache.nifi.processor.annotation.OnStopped;
+import org.apache.nifi.processor.annotation.OnUnscheduled;
+import org.apache.nifi.processor.annotation.SupportsBatching;
+import org.apache.nifi.processor.annotation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+
+@SupportsBatching
+@CapabilityDescription("Fetches messages from Apache Kafka")
+@Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"})
+public class GetKafka extends AbstractProcessor {
+    public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder()
+        .name("ZooKeeper Connection String")
+        .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port> combinations. For example, host1:2181,host2:2181,host3:2188")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+    public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+        .name("Topic Name")
+        .description("The Kafka Topic to pull messages from")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+    public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder()
+		.name("Zookeeper Commit Frequency")
+		.description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will result in better overall performance but can result in more data duplication if a NiFi node is lost")
+		.required(true)
+		.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+		.expressionLanguageSupported(false)
+		.defaultValue("60 secs")
+		.build();
+    public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder()
+	    .name("ZooKeeper Communications Timeout")
+	    .description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error")
+	    .required(true)
+	    .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+	    .expressionLanguageSupported(false)
+	    .defaultValue("30 secs")
+	    .build();
+    public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder()
+	    .name("Kafka Communications Timeout")
+	    .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
+	    .required(true)
+	    .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+	    .expressionLanguageSupported(false)
+	    .defaultValue("30 secs")
+	    .build();
+    public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
+        .name("Client Name")
+        .description("Client Name to use when communicating with Kafka")
+        .required(true)
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .build();
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+	    .name("success")
+	    .description("All FlowFiles that are created are routed to this relationship")
+	    .build();
+
+    
+    private final BlockingQueue<ConsumerIterator<byte[], byte[]>> streamIterators = new LinkedBlockingQueue<>();
+    private volatile ConsumerConnector consumer;
+
+    final Lock interruptionLock = new ReentrantLock();
+    // guarded by interruptionLock
+    private final Set<Thread> interruptableThreads = new HashSet<>();
+    
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    	final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder()
+    		.fromPropertyDescriptor(CLIENT_NAME)
+    		.defaultValue("NiFi-" + getIdentifier())
+    		.build();
+    	
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(ZOOKEEPER_CONNECTION_STRING);
+        props.add(TOPIC);
+        props.add(ZOOKEEPER_COMMIT_DELAY);
+        props.add(clientNameWithDefault);
+        props.add(KAFKA_TIMEOUT);
+        props.add(ZOOKEEPER_TIMEOUT);
+        return props;
+    }
+    
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>(1);
+        relationships.add(REL_SUCCESS);
+        return relationships;
+    }
+    
+    @OnScheduled
+    public void createConsumers(final ProcessContext context) {
+    	final String topic = context.getProperty(TOPIC).getValue();
+    	
+    	final Map<String, Integer> topicCountMap = new HashMap<>(1);
+    	topicCountMap.put(topic, context.getMaxConcurrentTasks());
+    	
+    	final Properties props = new Properties();
+    	props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); 
+    	props.setProperty("group.id", getIdentifier());
+    	props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
+    	props.setProperty("auto.commit.enable", "true"); // just be explicit
+    	props.setProperty("auto.offset.reset", "smallest");
+    	
+    	final ConsumerConfig consumerConfig = new ConsumerConfig(props);
+    	consumer = Consumer.createJavaConsumerConnector(consumerConfig);
+    	
+    	final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
+    	final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
+    	
+    	this.streamIterators.clear();
+    	
+    	for ( final KafkaStream<byte[], byte[]> stream : streams ) {
+    		streamIterators.add(stream.iterator());
+    	}
+    }
+    
+    @OnStopped
+    public void shutdownConsumer() {
+    	if ( consumer != null ) {
+    		try {
+    			consumer.commitOffsets();
+    		} finally {
+    			consumer.shutdown();
+    		}
+    	}
+    }
+    
+    @OnUnscheduled
+    public void interruptIterators() {
+    	// Kafka doesn't provide a non-blocking API for pulling messages. We can, however,
+    	// interrupt the Threads. We do this when the Processor is stopped so that we have the
+    	// ability to shutdown the Processor.
+    	interruptionLock.lock();
+    	try {
+    		for ( final Thread t : interruptableThreads ) {
+    			t.interrupt();
+    		}
+    		
+    		interruptableThreads.clear();
+    	} finally {
+    		interruptionLock.unlock();
+    	}
+    }
+    
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+    	ConsumerIterator<byte[], byte[]> iterator = streamIterators.poll();
+    	if ( iterator == null ) {
+    		return;
+    	}
+    	
+    	FlowFile flowFile = null;
+    	try {
+    		interruptionLock.lock();
+    		try {
+    			interruptableThreads.add(Thread.currentThread());
+    		} finally {
+    			interruptionLock.unlock();
+    		}
+    		
+    		try {
+	    		if (!iterator.hasNext() ) {
+	    			return;
+	    		}
+    		} catch (final Exception e) {
+    			getLogger().warn("Failed to invoke hasNext() due to ", new Object[] {e});
+    			iterator = null;
+    			return;
+    		}
+    		
+    		final long start = System.nanoTime();
+    		final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
+    		
+    		if ( mam == null ) {
+    			return;
+    		}
+    		
+    		final byte[] key = mam.key();
+    		
+    		final Map<String, String> attributes = new HashMap<>();
+    		if ( key != null ) {
+    			attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
+    		}
+    		attributes.put("kafka.offset", String.valueOf(mam.offset()));
+    		attributes.put("kafka.partition", String.valueOf(mam.partition()));
+    		attributes.put("kafka.topic", mam.topic());
+    		
+    		flowFile = session.create();
+    		flowFile = session.write(flowFile, new OutputStreamCallback() {
+				@Override
+				public void process(final OutputStream out) throws IOException {
+					out.write(mam.message());
+				}
+    		});
+    		
+    		flowFile = session.putAllAttributes(flowFile, attributes);
+    		final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+    		session.getProvenanceReporter().receive(flowFile, "kafka://" + mam.topic() + "/partitions/" + mam.partition() + "/offsets/" + mam.offset(), millis);
+    		getLogger().info("Successfully received {} from Kafka in {} millis", new Object[] {flowFile, millis});
+    		session.transfer(flowFile, REL_SUCCESS);
+    	} catch (final Exception e) {
+    		getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e});
+    		if ( flowFile != null ) {
+    			session.remove(flowFile);
+    		}
+    	} finally {
+    		interruptionLock.lock();
+    		try {
+    			interruptableThreads.remove(Thread.currentThread());
+    		} finally {
+    			interruptionLock.unlock();
+    		}
+    		
+    		if ( iterator != null ) {
+    			streamIterators.offer(iterator);
+    		}
+    	}
+    }
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
new file mode 100644
index 0000000..5e5940c
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -0,0 +1,252 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.annotation.CapabilityDescription;
+import org.apache.nifi.processor.annotation.OnStopped;
+import org.apache.nifi.processor.annotation.SupportsBatching;
+import org.apache.nifi.processor.annotation.Tags;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.StreamUtils;
+
+@SupportsBatching
+@Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
+@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka")
+public class PutKafka extends AbstractProcessor {
+    private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
+    private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
+    
+    public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("-1", "Guarantee Replicated Delivery", "FlowFile will be routed to failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
+    public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes");
+    public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result in data loss.");
+    
+    public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder()
+        .name("Known Brokers")
+        .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
+        .required(true)
+        .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
+        .expressionLanguageSupported(false)
+        .build();
+    public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
+	    .name("Topic Name")
+	    .description("The Kafka Topic of interest")
+	    .required(true)
+	    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+	    .expressionLanguageSupported(true)
+	    .build();
+    public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
+		.name("Kafka Key")
+		.description("The Key to use for the Message")
+		.required(false)
+		.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+		.expressionLanguageSupported(true)
+		.build();
+    public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
+		.name("Delivery Guarantee")
+		.description("Specifies the requirement for guaranteeing that a message is sent to Kafka")
+		.required(true)
+		.expressionLanguageSupported(false)
+		.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
+		.defaultValue(DELIVERY_BEST_EFFORT.getValue())
+		.build();
+    public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
+	    .name("Communications Timeout")
+	    .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
+	    .required(true)
+	    .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+	    .expressionLanguageSupported(false)
+	    .defaultValue("30 secs")
+	    .build();
+    public static final PropertyDescriptor MAX_FLOWFILE_SIZE = new PropertyDescriptor.Builder()
+		.name("Max FlowFile Size")
+		.description("Specifies the amount of data that can be buffered to send to Kafka. If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'. This helps to prevent the system from running out of memory")
+		.required(true)
+		.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+		.expressionLanguageSupported(false)
+		.defaultValue("1 MB")
+		.build();
+    public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
+	    .name("Client Name")
+	    .description("Client Name to use when communicating with Kafka")
+	    .required(true)
+	    .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+	    .expressionLanguageSupported(false)
+	    .build();
+
+    
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+	    .name("success")
+	    .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship")
+	    .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+	    .name("failure")
+	    .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
+	    .build();
+    public static final Relationship REL_REJECT = new Relationship.Builder()
+	    .name("reject")
+	    .description("Any FlowFile whose size exceeds the <Max FlowFile Size> property will be routed to this Relationship")
+	    .build();
+
+    private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>();
+    
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+    	final PropertyDescriptor clientName = new PropertyDescriptor.Builder()
+    		.fromPropertyDescriptor(CLIENT_NAME)
+    		.defaultValue("NiFi-" + getIdentifier())
+    		.build();
+    	
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(SEED_BROKERS);
+        props.add(TOPIC);
+        props.add(KEY);
+        props.add(DELIVERY_GUARANTEE);
+        props.add(TIMEOUT);
+        props.add(MAX_FLOWFILE_SIZE);
+        props.add(clientName);
+        return props;
+    }
+    
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>(1);
+        relationships.add(REL_SUCCESS);
+        relationships.add(REL_FAILURE);
+        relationships.add(REL_REJECT);
+        return relationships;
+    }
+    
+    
+    @OnStopped
+    public void closeProducers() {
+    	Producer<byte[], byte[]> producer;
+    	
+    	while ((producer = producers.poll()) != null) {
+    		producer.close();
+    	}
+    }
+    
+    
+    private Producer<byte[], byte[]> createProducer(final ProcessContext context) {
+    	final String brokers = context.getProperty(SEED_BROKERS).getValue();
+
+    	final Properties properties = new Properties();
+        properties.setProperty("metadata.broker.list", brokers);
+        properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
+        properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
+        properties.setProperty("request.timeout.ms", String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()));
+        
+        properties.setProperty("message.send.max.retries", "1");
+        properties.setProperty("producer.type", "sync");
+        
+        final ProducerConfig config = new ProducerConfig(properties);
+        return new Producer<>(config);
+    }
+    
+    private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) {
+    	Producer<byte[], byte[]> producer = producers.poll();
+    	return producer == null ? createProducer(context) : producer;
+    }
+    
+    private void returnProducer(final Producer<byte[], byte[]> producer) {
+    	producers.offer(producer);
+    }
+    
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+    	FlowFile flowFile = session.get();
+    	if ( flowFile == null ) {
+    		return;
+    	}
+    	
+    	final long start = System.nanoTime();
+    	final long maxSize = context.getProperty(MAX_FLOWFILE_SIZE).asDataSize(DataUnit.B).longValue();
+    	if ( flowFile.getSize() > maxSize ) {
+    		getLogger().info("Routing {} to 'reject' because its size exceeds the configured maximum allowed size", new Object[] {flowFile});
+    		session.getProvenanceReporter().route(flowFile, REL_REJECT, "FlowFile is larger than " + maxSize);
+    		session.transfer(flowFile, REL_REJECT);
+    		return;
+    	}
+    	
+        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
+        final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        
+        final byte[] value = new byte[(int) flowFile.getSize()];
+        session.read(flowFile, new InputStreamCallback() {
+			@Override
+			public void process(final InputStream in) throws IOException {
+				StreamUtils.fillBuffer(in, value);
+			}
+        });
+        
+        final Producer<byte[], byte[]> producer = borrowProducer(context);
+        boolean error = false;
+        try {
+        	final KeyedMessage<byte[], byte[]> message;
+        	if ( key == null ) {
+        		message = new KeyedMessage<>(topic, value);
+        	} else {
+        		message = new KeyedMessage<>(topic, key.getBytes(StandardCharsets.UTF_8), value);
+        	}
+        	
+        	producer.send(message);
+        	final long nanos = System.nanoTime() - start;
+        	
+        	session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
+        	session.transfer(flowFile, REL_SUCCESS);
+        	getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+        } catch (final Exception e) {
+        	getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e});
+        	session.transfer(flowFile, REL_FAILURE);
+        	error = true;
+        } finally {
+        	if ( error ) {
+        		producer.close();
+        	} else {
+        		returnProducer(producer);
+        	}
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..6ae3da1
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,2 @@
+org.apache.nifi.processors.kafka.GetKafka
+org.apache.nifi.processors.kafka.PutKafka
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
new file mode 100644
index 0000000..d429d6b
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
@@ -0,0 +1,147 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>GetKafka</title>
+        <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+        <!-- Processor Documentation ================================================== -->
+        <h2>Description:</h2>
+        <p>
+        	This Processors polls <a href="http://kafka.apache.org/">Apache Kafka</a>
+        	for data. When a message is received from Kafka, this Processor emits a FlowFile
+        	where the content of the FlowFile is the value of the Kafka message. If the
+        	message has a key associated with it, an attribute named <code>kafka.key</code>
+        	will be added to the FlowFile, with the value being the UTF-8 Encoded value
+        	of the Message's Key.
+        </p>
+        <p>
+        	Kafka supports the notion of a Consumer Group when pulling messages in order to
+        	provide scalability while still offering a publish-subscribe interface. Each
+        	Consumer Group must have a unique identifier. The Consumer Group identifier that
+        	is used by NiFi is the UUID of the Processor. This means that all of the nodes
+        	within a cluster will use the same Consumer Group Identifier so that they do
+        	not receive duplicate data but multiple GetKafka Processors can be used to pull
+        	from multiple Topics, as each Processor will receive a different Processor UUID 
+        	and therefore a different Consumer Group Identifier.
+        </p>
+
+        <p>
+            <strong>Modifies Attributes:</strong>
+        </p>
+        <table border="1">
+            <thead>
+                <tr>
+                    <th>Attribute Name</th>
+                    <th>Description</th>
+                </tr>
+            </thead>
+            <tbody>
+                <tr>
+                    <td>kafka.key</td>
+                    <td>The key of the Kafka message, if it exists. If the message does not have a key,
+                    this attribute will not be added.</td>
+                </tr>
+                <tr>
+                	<td>kafka.topic</td>
+                	<td>The name of the Kafka Topic from which the message was received</td>
+                </tr>
+                <tr>
+                	<td>kafka.partition</td>
+                	<td>The partition of the Kafka Topic from which the message was received</td>
+                </tr>
+                <tr>
+                	<td>kafka.offset</td>
+                	<td>The offset of the message within the Kafka partition</td>
+                </tr>
+            </tbody>
+        </table>
+
+
+        <p>
+            <strong>Properties:</strong>
+        </p>
+        <p>In the list below, the names of required properties appear
+            in bold. Any other properties (not in bold) are considered optional.
+            If a property has a default value, it is indicated. If a property
+            supports the use of the NiFi Expression Language (or simply,
+            "expression language"), that is also indicated.</p>
+        <ul>
+            <li><strong>ZooKeeper Connection String</strong>
+                <ul>
+                    <li>The Connection String to use in order to connect to ZooKeeper. This is often a 
+                    	comma-separated list of &lt;host&gt;:&lt;port&gt; combinations. For example, 
+                    	host1:2181,host2:2181,host3:2188</li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Topic Name</strong>
+                <ul>
+                    <li>The Kafka Topic to pull messages from</li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Zookeeper Commit Frequency</strong>
+                <ul>
+                    <li>Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. 
+                    	A longer time period will result in better overall performance but can result in more data 
+                    	duplication if a NiFi node is lost
+                    </li>
+                    <li>Default value: 60 secs</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>ZooKeeper Communications Timeout</strong>
+                <ul>
+                    <li>The amount of time to wait for a response from ZooKeeper before determining that there is a communications error</li>
+                    <li>Default value: 30 secs</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Kafka Communications Timeout</strong>
+                <ul>
+                    <li>The amount of time to wait for a response from Kafka before determining that there is a communications error</li>
+                    <li>Default value: 30 secs</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Client Name</strong>
+                <ul>
+                    <li>Client Name to use when communicating with Kafka</li>
+                    <li>Default value: "NiFi-" followed by the UUID of the Processor</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            
+        </ul>
+        <p>
+            <strong>Relationships:</strong>
+        </p>
+        <ul>
+            <li>success
+                <ul>
+                    <li>All messages that are received from Kafka are routed to the 'success' relationship</li>
+                </ul>
+            </li>
+        </ul>
+
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
new file mode 100644
index 0000000..38256c5
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
@@ -0,0 +1,177 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>PutKafka</title>
+        <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+        <!-- Processor Documentation ================================================== -->
+        <h2>Description:</h2>
+        <p>
+        	This Processors puts the contents of a FlowFile to a Topic in 
+        	<a href="http://kafka.apache.org/">Apache Kafka</a>. The full contents of
+        	a FlowFile becomes the contents of a single message in Kafka.
+        	This message is optionally assigned a key by using the
+        	&lt;Kafka Key&gt; Property.
+        </p>
+
+
+        <p>
+            <strong>Properties:</strong>
+        </p>
+        <p>In the list below, the names of required properties appear
+            in bold. Any other properties (not in bold) are considered optional.
+            If a property has a default value, it is indicated. If a property
+            supports the use of the NiFi Expression Language (or simply,
+            "expression language"), that is also indicated.</p>
+        <ul>
+            <li><strong>Known Brokers</strong>
+                <ul>
+                    <li>
+                    	A comma-separated list of known Kafka Brokers in the format 
+                    	&lgt;host&gt;:&lt;port&gt;. This list does not need to be
+                    	exhaustive but provides a mechanism for determining which
+                    	other nodes belong to the Kafka cluster.
+                    </li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Topic Name</strong>
+                <ul>
+                    <li>The Kafka Topic to send messages to. While the GetKafka
+                    	Processor requires a statically named Topic so that it knows
+                    	where to fetch messages from, the PutKafka Processor does allow
+                    	the Expression Language to be used so that a single PutKafka
+                    	Processor can be used to send messages to many different Kafka
+                    	topics.
+                    </li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: true</li>
+                </ul>
+            </li>
+            
+            <li>Kafka Key
+                <ul>
+                    <li>
+                    	The Key to use for the Message. If no value is given, the message
+                    	will not be given a Key.
+                    </li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: true</li>
+                </ul>
+            </li>
+            <li><strong>Delivery Guarantee</strong>
+                <ul>
+                    <li>
+                    	Specifies the requirement for guaranteeing that a message is sent to Kafka.
+                    	This Property can have one of three different values:
+                    	<ul>
+                    		<li>
+                    			<b>Guarantee Replicated Delivery</b> - FlowFile will be routed to 
+	                    			failure unless the message is replicated to the appropriate number 
+	                    			of Kafka Nodes according to the Topic configuration
+	                    	</li>
+                    		<li>
+                    			<b>Guarantee Single Node Delivery</b> - FlowFile will be routed to 
+                    				success if the message is received by a single Kafka node, 
+                    				whether or not it is replicated. This is faster than 
+                    				&lt;Guarantee Replicated Delivery&gt; but can result in data loss 
+                    				if a Kafka node crashes
+                    		</li>
+                    		<li>
+                    			<b>Best Effort</b> - FlowFile will be routed to success after successfully 
+                    				writing the content to a Kafka node, without waiting for a response. 
+                    				This provides the best performance but may result in data loss.
+                    		</li>
+                    	</ul>
+                    </li>
+                    <li>Default value: Best Effort</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Communications Timeout</strong>
+                <ul>
+                    <li>
+                    	The amount of time to wait for a response from Kafka before determining 
+                    	that there is a communications error
+                    </li>
+                    <li>Default value: 30 secs</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Max FlowFile Size</strong>
+                <ul>
+                    <li>
+                    	Specifies the amount of data that can be buffered to send to Kafka. Because
+                    	the contents of the FlowFile must be buffered into memory before they can
+                    	be sent to Kafka, attempting to send a very large FlowFile can cause
+                    	problems by causing the machine to run out of memory.
+                    	This helps to prevent the system from running out of memory, the PutKafka
+                    	Processor exposes a property for specifying the maximum size of a FlowFile.
+                    	If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'.
+                    </li>
+                    <li>Default value: 1 MB</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            <li><strong>Client Name</strong>
+                <ul>
+                    <li>Client Name to use when communicating with Kafka</li>
+                    <li>Default value: "NiFi-" followed by the UUID of the Processor</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+        </ul>
+        
+        
+        <p>
+            <strong>Relationships:</strong>
+        </p>
+        <ul>
+            <li>success
+                <ul>
+                    <li>All FlowFiles that are successfully sent to Kafka are routed 
+                    	to this relationship.
+                    </li>
+                </ul>
+            </li>
+            
+            <li>reject
+                <ul>
+                    <li>Any FlowFile whose content size exceeds the configured value for 
+                    	the &lt;Max FlowFile Size&gt; property will be routed to this 
+                    	relationship.
+                    </li>
+                </ul>
+            </li>
+            
+            <li>failure
+                <ul>
+                    <li>All FlowFiles that cannot be sent to Kafka for any reason other 
+                    	than their content size exceeding the value of the &lt;Max FlowFile 
+                    	Size&gt; property will be routed to this relationship.
+                    </li>
+                </ul>
+            </li>
+            
+        </ul>
+
+    </body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
new file mode 100644
index 0000000..2199a9c
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.kafka;
+
+import java.util.List;
+
+import org.apache.log4j.BasicConfigurator;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore("Intended only for local tests to verify functionality.")
+public class TestGetKafka {
+
+	public static final String ZOOKEEPER_CONNECTION = "192.168.0.101:2181";
+	
+    @BeforeClass
+    public static void configureLogging() {
+    	System.setProperty("org.slf4j.simpleLogger.log.kafka", "INFO");
+        System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.kafka", "INFO");
+        BasicConfigurator.configure();
+    }
+    
+    @Test
+    public void testIntegrationLocally() {
+        final TestRunner runner = TestRunners.newTestRunner(GetKafka.class);
+        runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, ZOOKEEPER_CONNECTION);
+        runner.setProperty(GetKafka.TOPIC, "testX");
+        runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
+        runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
+        
+        runner.run(20, false);
+        
+        final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS);
+        for ( final MockFlowFile flowFile : flowFiles ) {
+        	System.out.println(flowFile.getAttributes());
+        	System.out.println(new String(flowFile.toByteArray()));
+        	System.out.println();
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
new file mode 100644
index 0000000..2e6aacf
--- /dev/null
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -0,0 +1,48 @@
+package org.apache.nifi.processors.kafka;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Ignore;
+import org.junit.Test;
+
+
+@Ignore("Intended only for local testing to verify functionality.")
+public class TestPutKafka {
+
+	@Test
+	public void testKeyValuePut() {
+		final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
+		runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
+		runner.setProperty(PutKafka.TOPIC, "${kafka.topic}");
+		runner.setProperty(PutKafka.KEY, "${kafka.key}");
+		runner.setProperty(PutKafka.TIMEOUT, "3 secs");
+		runner.setProperty(PutKafka.DELIVERY_GUARANTEE, PutKafka.DELIVERY_REPLICATED.getValue());
+		
+		final Map<String, String> attributes = new HashMap<>();
+		attributes.put("kafka.topic", "test");
+		attributes.put("kafka.key", "key3");
+		
+		final byte[] data = "Hello, World, Again! ;)".getBytes();
+		runner.enqueue(data, attributes);
+		runner.enqueue(data, attributes);
+		runner.enqueue(data, attributes);
+		runner.enqueue(data, attributes);
+		
+		runner.run(5);
+		
+		runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 4);
+		final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS);
+		final MockFlowFile mff = mffs.get(0);
+		
+		assertTrue(Arrays.equals(data, mff.toByteArray()));
+	}
+	
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/nar-bundles/kafka-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/pom.xml b/nar-bundles/kafka-bundle/pom.xml
new file mode 100644
index 0000000..146db12
--- /dev/null
+++ b/nar-bundles/kafka-bundle/pom.xml
@@ -0,0 +1,35 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nar-bundle-parent</artifactId>
+        <version>0.0.1-SNAPSHOT</version>
+    </parent>
+
+	<modelVersion>4.0.0</modelVersion>
+	
+  <artifactId>kafka-bundle</artifactId>
+  <packaging>pom</packaging>
+
+  <name>kafka-bundle</name>
+  
+  <modules>
+	<module>kafka-processors</module>
+	<module>kafka-nar</module>
+  </modules>
+  
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/3e2f7906/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 1de1a0e..68e718a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -768,6 +768,12 @@
             </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
+                <artifactId>kafka-nar</artifactId>
+                <version>${project.version}</version>
+                <type>nar</type>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.nifi</groupId>
                 <artifactId>nifi-properties</artifactId>
                 <version>${project.version}</version>
             </dependency>


[6/7] incubator-nifi git commit: Merge branch 'NIFI-220' into develop

Posted by ma...@apache.org.
Merge branch 'NIFI-220' into develop


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/0b25df49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/0b25df49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/0b25df49

Branch: refs/heads/develop
Commit: 0b25df490b3c517a5b7d9c89cefe16a6173b2e16
Parents: b1e5c5e c91c7e7
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jan 5 20:33:32 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Jan 5 20:33:32 2015 -0500

----------------------------------------------------------------------
 assembly/pom.xml                                |   5 +
 nar-bundles/kafka-bundle/kafka-nar/pom.xml      |  37 ++
 .../kafka-bundle/kafka-processors/pom.xml       |  76 ++++
 .../apache/nifi/processors/kafka/GetKafka.java  | 314 ++++++++++++++
 .../apache/nifi/processors/kafka/PutKafka.java  | 415 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   2 +
 .../index.html                                  | 173 ++++++++
 .../index.html                                  | 189 +++++++++
 .../nifi/processors/kafka/TestGetKafka.java     | 162 ++++++++
 .../nifi/processors/kafka/TestPutKafka.java     | 220 ++++++++++
 nar-bundles/kafka-bundle/pom.xml                |  35 ++
 pom.xml                                         |   6 +
 12 files changed, 1634 insertions(+)
----------------------------------------------------------------------



[7/7] incubator-nifi git commit: NIFI-221: included controller service needed in nar-bundles pom

Posted by ma...@apache.org.
NIFI-221: included controller service needed in nar-bundles pom


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/2d0b1639
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/2d0b1639
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/2d0b1639

Branch: refs/heads/develop
Commit: 2d0b163961723e9163c3ed34a7fdab0b5a1dfcac
Parents: 0b25df4 d8b4a78
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Jan 6 10:16:00 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Tue Jan 6 10:16:00 2015 -0500

----------------------------------------------------------------------
 .../evaluation/cast/DateCastEvaluator.java      |  3 +-
 .../main/webapp/WEB-INF/jsp/documentation.jsp   |  1 +
 .../src/main/webapp/js/application.js           |  2 +-
 .../nifi-web-ui/src/main/webapp/css/graph.css   |  5 +-
 .../nifi-web-ui/src/main/webapp/css/main.css    |  6 ++
 .../webapp/js/nf/canvas/nf-canvas-header.js     |  4 +-
 .../main/webapp/js/nf/canvas/nf-canvas-utils.js | 37 +++-------
 .../src/main/webapp/js/nf/canvas/nf-canvas.js   |  4 +-
 .../js/nf/canvas/nf-connection-configuration.js |  4 +-
 .../main/webapp/js/nf/canvas/nf-draggable.js    | 12 +++-
 .../src/main/webapp/js/nf/canvas/nf-port.js     |  2 +-
 .../js/nf/canvas/nf-processor-property-table.js |  4 +-
 .../main/webapp/js/nf/canvas/nf-processor.js    |  2 +-
 .../nf/canvas/nf-remote-process-group-ports.js  | 12 ++--
 .../js/nf/canvas/nf-remote-process-group.js     |  4 +-
 .../src/main/webapp/js/nf/nf-common.js          |  2 +-
 .../main/webapp/js/nf/nf-processor-details.js   |  4 +-
 .../webapp/js/nf/summary/nf-summary-table.js    | 16 ++---
 nar-bundles/framework-bundle/pom.xml            | 30 ++++----
 nar-bundles/hadoop-bundle/pom.xml               |  2 +-
 nar-bundles/monitor-threshold-bundle/pom.xml    |  4 +-
 .../pom.xml                                     |  2 +-
 nar-bundles/pom.xml                             | 30 ++++----
 nar-bundles/standard-bundle/pom.xml             | 10 +--
 .../nifi/processors/standard/ListenHTTP.java    | 11 +++
 .../standard/servlets/ListenHTTPServlet.java    | 14 +++-
 .../index.html                                  |  6 ++
 nar-bundles/update-attribute-bundle/pom.xml     |  6 +-
 .../pom.xml                                     |  2 +-
 nifi-docs/pom.xml                               |  2 +-
 nifi-docs/src/main/asciidoc/overview.adoc       | 74 ++++++++++----------
 pom.xml                                         | 62 ++++++++--------
 32 files changed, 199 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2d0b1639/nar-bundles/pom.xml
----------------------------------------------------------------------
diff --cc nar-bundles/pom.xml
index 7c79a48,38e69e0..d21a3bd
--- a/nar-bundles/pom.xml
+++ b/nar-bundles/pom.xml
@@@ -86,18 -85,8 +86,18 @@@
              <dependency>
                  <groupId>org.apache.nifi</groupId>
                  <artifactId>ssl-context-service</artifactId>
-                 <version>${project.version}</version>
+                 <version>0.0.1-SNAPSHOT</version>
 +            </dependency>
 +            <dependency>
 +            	<groupId>org.apache.nifi</groupId>
 +            	<artifactId>http-context-map-api</artifactId>
 +            	<version>${project.version}</version>
 +            </dependency>
 +            <dependency>
 +            	<groupId>org.apache.nifi</groupId>
 +            	<artifactId>http-context-map</artifactId>
-             	<version>${project.version}</version>
-             </dependency>
++                <version>0.0.1-SNAPSHOT</version>
+             </dependency>                      
              <dependency>
                  <groupId>org.apache.nifi</groupId>
                  <artifactId>volatile-provenance-repository</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/2d0b1639/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 68e718a,e4b2707..dc751a1
--- a/pom.xml
+++ b/pom.xml
@@@ -768,14 -768,8 +768,14 @@@
              </dependency>
              <dependency>
                  <groupId>org.apache.nifi</groupId>
 +                <artifactId>kafka-nar</artifactId>
 +                <version>${project.version}</version>
 +                <type>nar</type>
 +            </dependency>
 +            <dependency>
 +                <groupId>org.apache.nifi</groupId>
                  <artifactId>nifi-properties</artifactId>
-                 <version>${project.version}</version>
+                 <version>0.0.1-SNAPSHOT</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.nifi</groupId>


[4/7] incubator-nifi git commit: NIFI-218: Finished documenting the functions; changed formatting so that classnames can be applied to the functions, arguments, description, return type, and subject Types. This allows the values to be pulled out into the

Posted by ma...@apache.org.
NIFI-218: Finished documenting the functions; changed formatting so that classnames can be applied to the functions, arguments, description, return type, and subject Types. This allows the values to be pulled out into the UI.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/090ad386
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/090ad386
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/090ad386

Branch: refs/heads/develop
Commit: 090ad38679271019a22b6a973c18726db60fd07e
Parents: a2ecfe3
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jan 5 16:37:17 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Jan 5 16:37:17 2015 -0500

----------------------------------------------------------------------
 .../asciidoc/expression-language-guide.adoc     | 949 ++++++++++++-------
 1 file changed, 619 insertions(+), 330 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/090ad386/nifi-docs/src/main/asciidoc/expression-language-guide.adoc
----------------------------------------------------------------------
diff --git a/nifi-docs/src/main/asciidoc/expression-language-guide.adoc b/nifi-docs/src/main/asciidoc/expression-language-guide.adoc
index 5e01064..6a6511a 100644
--- a/nifi-docs/src/main/asciidoc/expression-language-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/expression-language-guide.adoc
@@ -18,6 +18,7 @@ Apache NiFi Expression Language Guide
 =====================================
 Apache NiFi Team <de...@nifi.incubator.apache.org>
 :homepage: http://nifi.incubator.apache.org
+:linkcss:
 
 [[overview]]
 Overview
@@ -213,47 +214,58 @@ functions are used for performing boolean logic, such as comparing two values.
 Each of these functions returns a value of type Boolean.
 
 
-=== *isNull*
-*Description*: The `isNull` function returns `true` if the subject is null, `false` otherwise. This is typically used to determine
-	if an attribute exists. 
+[.function]
+=== isNull
+*Description*: [.description]#The `isNull` function returns `true` if the subject is null, `false` otherwise. This is typically used to determine
+if an attribute exists.#
 
-*Subject Type*: Any
+*Subject Type*: [.subject]#Any#
 
 *Arguments*: No arguments
 
-*Return Type*: Boolean
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*:	`${filename:isNull()}` returns `true` if the "filename" attribute does not exist. 
 	It returns `true` if the attribute exists.
 
 
 
-=== *notNull*
-*Description*:The `notNull` function returns the opposite value of the `isNull` function. That is, it will return `true` if the
-	subject exists and `false` otherwise.
+[.function]
+=== notNull
+*Description*: [.description]#The `notNull` function returns the opposite value of the `isNull` function. That is, it will return `true` if the
+subject exists and `false` otherwise.#
 	
-*Subject Type*: Any
+*Subject Type*: [.subject]#Any#
 
 *Arguments*: No arguments
 
-*Return Type*: Boolean
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: `${filename:notNull()}` returns `true` if the "filename" attribute exists. It returns "false" if the attribute
 	does not exist.
 
 
 
-=== *equals*
-*Description*: The `equals` function is very widely used and determines if its subject is equal to another String value.
+[.function]
+=== equals
+
+[.description]
+*Description*: [.description]#The `equals` function is very widely used and determines if its subject is equal to another String value.
 	Note that the `equals` function performs a direct comparison of two String values. Take care not to confuse this
-	function with the <<matches>> function, which evaluates its subject against a Regular Expression.
-	
-*Subject Type*: Any
+	function with the <<matches>> function, which evaluates its subject against a Regular Expression.#
 
-*Arguments*: The equals function takes a single argument. It is expected to be of the same type as the Subject.
+[.subject]	
+*Subject Type*: [.subject]#Any#
+
+[.arguments]
+*Arguments*:
+	
+	- [.argName]#_value_# : [.argDesc]#The value to compare the Subject to. Must be same type as the Subject.#
 
-*Return Type*: Boolean
+[.returnType]
+*Return Type*: [.returnType]#Boolean#
 
+[.examples]
 *Examples*:
 We can check if the filename of a FlowFile is "hello.txt" by using the expression `${filename:equals('hello.txt')}`,
 or we could check if the value of the attribute `hello` is equal to the value of the `filename` attribute:
@@ -261,15 +273,18 @@ or we could check if the value of the attribute `hello` is equal to the value of
 
 
 
-=== *equalsIgnoreCase*
-*Description*: Similar to the `equals` function, the `equalsIgnoreCase` function compares its subject against a String value but returns
-`true` if the two values differ only by case (upper case vs. lower case).
+[.function]
+=== equalsIgnoreCase
+*Description*: [.description]#Similar to the `equals` function, the `equalsIgnoreCase` function compares its subject against a String value but returns
+`true` if the two values differ only by case (upper case vs. lower case).#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
-*Arguments*: 1: String
+*Arguments*:
 
-*Return Type*: Boolean
+	- [.argName]#_value_# : [.argDesc]#The value to compare the Subject to.#
+
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: `${filename:equalsIgnoreCase('hello.txt')}` will evaluate to `true` if filename is equal to "hello.txt" 
 	or "HELLO.TXT" or "HeLLo.TxT".
@@ -277,16 +292,19 @@ or we could check if the value of the attribute `hello` is equal to the value of
 
 
 
-=== *gt*
-*Description*: The `gt` function is used for numeric comparison and returns `true` if the subject is Greater Than 
+[.function]
+=== gt
+*Description*: [.description]#The `gt` function is used for numeric comparison and returns `true` if the subject is Greater Than 
 	its argument. If either the subject or the argument cannot be coerced into a Number, 
-	this function returns `false`.
+	this function returns `false`.#
+
+*Subject Type*: [.subject]#Number#
 
-*Subject Type*: Number
+*Arguments*:
 
-*Arguments*: 1: Number
+	- [.argName]#_value_# : [.argDesc]#The number to compare the Subject to.#
 
-*Return Type*: Boolean
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: `${fileSize:gt( 1024 )}` will return `true` if the size of the FlowFile's content is more than 1 kilobyte
 	(1024 bytes). Otherwise, it will return `false`.
@@ -294,31 +312,38 @@ or we could check if the value of the attribute `hello` is equal to the value of
 
 
 
-=== *ge*
-*Description*: The `ge` function is used for numeric comparison and returns `true` if the subject is Greater Than 
+[.function]
+=== ge
+*Description*: [.description]#The `ge` function is used for numeric comparison and returns `true` if the subject is Greater Than 
 	Or Equal To its argument. If either the subject or the argument cannot be coerced into a Number, 
-	this function returns `false`.
+	this function returns `false`.#
 
-*Subject Type*: Number
+*Subject Type*: [.subject]#Number#
 
-*Arguments*: 1: Number
+*Arguments*:
 
-*Return Type*: Boolean
+	- [.argName]#_value_# : [.argDesc]#The number to compare the Subject to.#
+
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: `${fileSize:ge( 1024 )}` will return `true` if the size of the FlowFile's content is at least (
 	is greater than or equal to) 1 kilobyte (1024 bytes). Otherwise, it will return `false`.
 
 
-=== *lt*
-*Description*: The `lt` function is used for numeric comparison and returns `true` if the subject is Less Than 
+
+[.function]
+=== lt
+*Description*: [.description]#The `lt` function is used for numeric comparison and returns `true` if the subject is Less Than 
 	its argument. If either the subject or the argument cannot be coerced into a Number, 
-	this function returns `false`.
+	this function returns `false`.#
 
-*Subject Type*: Number
+*Subject Type*: [.subject]#Number#
 
-*Arguments*: 1: Number
+*Arguments*:
 
-*Return Type*: Boolean
+	- [.argName]#_value_# : [.argDesc]#The number to compare the Subject to.#
+
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: `${fileSize:lt( 1048576 )}` will return `true` if the size of the FlowFile's content is less than
 	1 megabyte (1048576 bytes). Otherwise, it will return `false`.
@@ -326,16 +351,19 @@ or we could check if the value of the attribute `hello` is equal to the value of
 
 
 
-=== *le*
-*Description*: The `le` function is used for numeric comparison and returns `true` if the subject is Less Than 
+[.function]
+=== le
+*Description*: [.description]#The `le` function is used for numeric comparison and returns `true` if the subject is Less Than 
 	Or Equal To its argument. If either the subject or the argument cannot be coerced into a Number, 
-	this function returns `false`.
+	this function returns `false`.#
+
+*Subject Type*: [.subject]#Number#
 
-*Subject Type*: Number
+*Arguments*:
 
-*Arguments*: 1: Number
+	- [.argName]#_value_# : [.argDesc]#The number to compare the Subject to.#
 
-*Return Type*: Boolean
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: `${fileSize:le( 1048576 )}` will return `true` if the size of the FlowFile's content is at most
 	(less than or equal to) 1 megabyte (1048576 bytes). Otherwise, it will return `false`.
@@ -345,16 +373,20 @@ or we could check if the value of the attribute `hello` is equal to the value of
 
 
 
-=== *and*
-*Description*: The `and` function takes as a single argument a Boolean value and returns `true` if both the Subject
+[.function]
+=== and
+*Description*: [.description]#The `and` function takes as a single argument a Boolean value and returns `true` if both the Subject
 	and the argument are `true`. If either the subject or the argument is `false` or cannot be coerced into a Boolean,
-	the function returns `false`. Typically, this is used with an embedded Expression as the argument.
+	the function returns `false`. Typically, this is used with an embedded Expression as the argument.#
+
+*Subject Type*: [.subject]#Boolean#
+
+*Arguments*:
 
-*Subject Type*: Boolean
+	- [.argName]#_condition_# : [.argDesc]#The right-hand-side of the 'and' Expression#
 
-*Arguments*: 1: Boolean
 
-*Return Type*: Boolean
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: We can check if the filename is both all lower-case and has at least 5 characters by using the Expression
 -----------------------------------------------
@@ -367,17 +399,20 @@ ${filename:toLower():equals( ${filename} ):and(
 
 
 
-=== *or*
+[.function]
+=== or
 
-*Description*: The `or` function takes as a single argument a Boolean value and returns `true` if either the Subject
+*Description*: [.description]#The `or` function takes as a single argument a Boolean value and returns `true` if either the Subject
 	or the argument is `true`. If both the subject and the argument are `false`, the function returns `false`. If
-	either the Subject or the argument cannot be coerced into a Boolean value, this function will return `false`.
+	either the Subject or the argument cannot be coerced into a Boolean value, this function will return `false`.#
 
-*Subject Type*: Boolean
+*Subject Type*: [.subject]#Boolean#
 
-*Arguments*: 1: Boolean
+*Arguments*:
 
-*Return Type*: Boolean
+	- [.argName]#_condition_# : [.argDesc]#The right-hand-side of the 'and' Expression#
+
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: The following example will return `true` if either the filename has exactly 5 characters or if
 	the filename is all lower-case.
@@ -389,17 +424,22 @@ ${filename:toLower():equals( ${filename} ):or(
 
 
 
+[.function]
+=== not
 
-=== *not*
-
-*Description*: The `not` function returns the negation of the Boolean value of the subject.
+[.description]
+*Description*: [.description]#The `not` function returns the negation of the Boolean value of the subject.#
 
-*Subject Type*: Boolean
+[.subject]
+*Subject Type*: [.subject]#Boolean#
 
+[.arguments]
 *Arguments*: No arguments
 
-*Return Type*: Boolean
+[.returnType]
+*Return Type*: [.returnType]#Boolean#
 
+[.examples]
 *Examples*: We can invert the value of another function by using the `not` function, as 
 	`${filename:equals('hello.txt'):not()}`. This will return `true` if the filename is NOT equal to
 	"hello.txt" and will return `false` if the filename is "hello.txt."
@@ -418,16 +458,17 @@ Each of the following functions manipulates a String in some way.
 
 
 
-=== *toUpper*
+[.function]
+=== toUpper
 
-*Description*: This function converts the Subject into an all upper-case String. Said another way, it
-	replaces any lowercase letter with the uppercase equivalent.
+*Description*: [.description]#This function converts the Subject into an all upper-case String. Said another way, it
+	replaces any lowercase letter with the uppercase equivalent.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
 *Arguments*: No arguments
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
 *Examples*: If the "filename" attribute is "abc123.txt", then the Expression `${filename:toUpper()}` 
 	will return "ABC123.TXT"
@@ -436,16 +477,17 @@ Each of the following functions manipulates a String in some way.
 
 
 
-=== *toLower*
+[.function]
+=== toLower
 
-*Description*: This function converts the Subject into an all lower-case String. Said another way,
-	it replaces any uppercase letter with the lowercase equivalent.
+*Description*: [.description]#This function converts the Subject into an all lower-case String. Said another way,
+	it replaces any uppercase letter with the lowercase equivalent.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
 *Arguments*: No arguments
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
 *Examples*: If the "filename" attribute is "ABC123.TXT", then the Expression `${filename:toLower()}`
 	will return "abc123.txt"
@@ -454,15 +496,16 @@ Each of the following functions manipulates a String in some way.
 
 
 
-=== *trim*
+[.function]
+=== trim
 
-*Description*: The `trim` function will remove any leading or trailing white space from its subject.
+*Description*: [.description]#The `trim` function will remove any leading or trailing white space from its subject.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
 *Arguments*: No arguments
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
 *Examples*: If the attribute `attr` has the value "     1 2 3     ", then the Expression `${attr:trim()}` will
 	return the value "1 2 3".
@@ -471,16 +514,17 @@ Each of the following functions manipulates a String in some way.
 
 
 
-=== *urlEncode*
+[.function]
+=== urlEncode
 
-*Description*: Returns a URL-friendly version of the Subject. This is useful, for instance, when using an
-	attribute value to indicate the URL of a website.
+*Description*: [.description]#Returns a URL-friendly version of the Subject. This is useful, for instance, when using an
+	attribute value to indicate the URL of a website.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
 *Arguments*: No arguments
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
 *Examples*: We can URL-Encode an attribute named "url" by using the Expression `${url:urlEncode()}`. If
 	the value of the "url" attribute is "https://nifi.incubator.apache.org/some value with spaces", this
@@ -489,15 +533,16 @@ Each of the following functions manipulates a String in some way.
 
 
 
-=== *urlDecode*
+[.function]
+=== urlDecode
 
-*Description*: Converts a URL-friendly version of the Subject into a human-readable form.
+*Description*: [.description]#Converts a URL-friendly version of the Subject into a human-readable form.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
 *Arguments*: No arguments
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
 *Examples*: If we have a URL-Encoded attribute named "url" with the value 
 	"https://nifi.incubator.apache.org/some%20value%20with%20spaces", then the Expression
@@ -507,34 +552,34 @@ Each of the following functions manipulates a String in some way.
 
 
 
-=== *substring*
+[.function]
+=== substring
 
 *Description*: 
-	Returns a portion of the Subject, given a _starting index_ and an optional _ending index_.
+[.description]#Returns a portion of the Subject, given a _starting index_ and an optional _ending index_.
 	If the _ending index_ is not supplied, it will return the portion of the Subject starting at the given
-	'start index' and ending at the end of the Subject value.
-	
-	
-The _starting index_ and _ending index_ are zero-based. That is, the first character is referenced by using
-	the value `0`, not `1`.
+	'start index' and ending at the end of the Subject value.#
+
+[.description]#The _starting index_ and _ending index_ are zero-based. That is, the first character is referenced by using
+	the value `0`, not `1`.#
 
-If either the _starting index_ is or the _ending index_ is not a number, this function call will result
-	in an error.
+[.description]#If either the _starting index_ is or the _ending index_ is not a number, this function call will result
+	in an error.#
 
-If the _starting index_ is larger than the _ending index_, this function call will result in an error.
+[.description]#If the _starting index_ is larger than the _ending index_, this function call will result in an error.#
 
-If the _starting index_ or the _ending index_ is greater than the length of the Subject or has a value
-	less than 0, this function call will result in an error.
+[.description]#If the _starting index_ or the _ending index_ is greater than the length of the Subject or has a value
+	less than 0, this function call will result in an error.#
 
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
 *Arguments*: 
 
-	- _starting index_ : Number
-	- _ending index_ : Number
+	- [.argName]#_starting index_# : [.argDesc]#The 0-based index of the first character to capture (inclusive)#
+	- [.argName]#_ending index_# : [.argDesc]#The 0-based index of the last character to capture (exclusive)#
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
 *Examples*: 
 
@@ -553,17 +598,20 @@ then the following Expressions will result in the following values:
 
 
 
-=== *substringBefore*
+[.function]
+=== substringBefore
 
-*Description*: Returns a portion of the Subject, starting with the first character of the Subject
+*Description*: [.description]#Returns a portion of the Subject, starting with the first character of the Subject
 	and ending with the character immediately before the first occurrence of the argument. If
-	the argument is not present in the Subject, the entire Subject will be returned.
+	the argument is not present in the Subject, the entire Subject will be returned.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
-*Arguments*: 1: String: The String to search for in the Subject
+*Arguments*:
 
-*Return Type*: String
+	- [.argName]#_value_# : [.argDesc]#The String to search for in the Subject#
+
+*Return Type*: [.returnType]#String#
 
 *Examples*: If the "filename" attribute has the value "a brand new filename.txt",
 	then the following Expressions will result in the following values:
@@ -581,17 +629,20 @@ then the following Expressions will result in the following values:
 
 
 
-=== *substringBeforeLast*
+[.function]
+=== substringBeforeLast
 
-*Description*: Returns a portion of the Subject, starting with the first character of the Subject
+*Description*: [.description]#Returns a portion of the Subject, starting with the first character of the Subject
 	and ending with the character immediately before the last occurrence of the argument. If
-	the argument is not present in the Subject, the entire Subject will be returned.
+	the argument is not present in the Subject, the entire Subject will be returned.#
+
+*Subject Type*: [.subject]#String#
 
-*Subject Type*: String
+*Arguments*:
 
-*Arguments*: 1: String: The String to search for in the Subject
+	- [.argName]#_value_# : [.argDesc]#The String to search for in the Subject#
 
-*Return Type*: String
+*Return Type*: [.returnType]#String3
 
 *Examples*: If the "filename" attribute has the value "a brand new filename.txt",
 	then the following Expressions will result in the following values:
@@ -610,17 +661,20 @@ then the following Expressions will result in the following values:
 
 
 
-=== *substringAfter*
+[.function]
+=== substringAfter
 
-*Description*: Returns a portion of the Subject, starting with the character immediately after
+*Description*: [.description]#Returns a portion of the Subject, starting with the character immediately after
 	the first occurrence of the argument and extending to the end of the Subject. If
-	the argument is not present in the Subject, the entire Subject will be returned.
+	the argument is not present in the Subject, the entire Subject will be returned.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
-*Arguments*: 1: String: The String to search for in the Subject
+*Arguments*:
 
-*Return Type*: String
+	- [.argName]#_value_# : [.argDesc]#The String to search for in the Subject#
+
+*Return Type*: [.returnType]#String#
 
 *Examples*: If the "filename" attribute has the value "a brand new filename.txt",
 	then the following Expressions will result in the following values:
@@ -638,17 +692,20 @@ then the following Expressions will result in the following values:
 
 
 
-=== *substringAfterLast*
+[.function]
+=== substringAfterLast
 
-*Description*: Returns a portion of the Subject, starting with the character immediately after
+*Description*: [.description]#Returns a portion of the Subject, starting with the character immediately after
 	the last occurrence of the argument and extending to the end of the Subject. If
-	the argument is not present in the Subject, the entire Subject will be returned.
+	the argument is not present in the Subject, the entire Subject will be returned.#
+
+*Subject Type*: [.subject]#String#
 
-*Subject Type*: String
+*Arguments*:
 
-*Arguments*: 1: String: The String to search for in the Subject
+	- [.argName]#_value_# : [.argDesc]#The String to search for in the Subject#
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
 *Examples*: If the "filename" attribute has the value "a brand new filename.txt",
 	then the following Expressions will result in the following values:
@@ -668,16 +725,19 @@ then the following Expressions will result in the following values:
 
 
 
-=== *append*
+[.function]
+=== append
+
+*Description*: [.description]#The `append` function returns the result of appending the argument to the value of
+	the Subject. If the Subject is null, returns the argument itself.#
 
-*Description*: The `append` function returns the result of appending the argument to the value of
-	the Subject. If the Subject is null, returns the argument itself.
+*Subject Type*: [.subject]#String#
 
-*Subject Type*: String
+*Arguments*:
 
-*Arguments*: 1: String
+	- [.argName]#_value_# : [.argDesc]#The String to append to the end of the Subject#
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
 *Examples*: If the "filename" attribute has the value "a brand new filename.txt", then the Expression
 	`${filename:append('.gz')}` will return "a brand new filename.txt.gz".
@@ -686,16 +746,20 @@ then the following Expressions will result in the following values:
 
 
 
-=== *prepend*
+[.function]
+=== prepend
 
-*Description*: The `prepend` function returns the result of prepending the argument to the value of
-	the Subject. If the subject is null, returns the argument itself.
+*Description*: [.description]#The `prepend` function returns the result of prepending the argument to the value of
+	the Subject. If the subject is null, returns the argument itself.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
-*Arguments*: 1: String
+*Arguments*:
 
-*Return Type*: String
+	- [.argName]#_value_# : [.argDesc]#The String to prepend to the beginning of the Subject#
+
+
+*Return Type*: [.returnType]#String#
 
 *Examples*: If the "filename" attribute has the value "filename.txt", then the Expression
 	`${filename:prepend('a brand new ')}` will return "a brand new filename.txt".
@@ -704,18 +768,19 @@ then the following Expressions will result in the following values:
 
 
 
-=== *replace*
+[.function]
+=== replace
 
-*Description*: Replaces occurrences of one String within the Subject with another String.
+*Description*: [.description]#Replaces occurrences of one String within the Subject with another String.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
-*Arguments*: 2
+*Arguments*:
 
-	_Search String_: The String to find within the Subject
-	_Replacement_: The value to replace _Search String_ with
+	- [.argName]#_Search String_# : [.argDesc]#The String to find within the Subject#
+	- [.argName]#_Replacement_# : [.argDesc]#The value to replace _Search String_ with#
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
 *Examples*: If the "filename" attribute has the value "a brand new filename.txt", then the following
 Expressions will provide the following results:
@@ -735,28 +800,31 @@ Expressions will provide the following results:
 
 
 
-=== *replaceAll*
+[.function]
+=== replaceAll
 
-*Description*: The `replaceAll` function takes two String arguments: a Regular Expression (NiFi uses the Java Pattern
+*Description*: [.description]#The `replaceAll` function takes two String arguments: a Regular Expression (NiFi uses the Java Pattern
 	syntax), and a replacement string. The return value is the result of substituting the replacement string for
-	all patterns within the Subject that match the Regular Expression.
+	all patterns within the Subject that match the Regular Expression.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
-*Arguments*: 2
+*Arguments*:
 
-	_regular expression_: the Regular Expression (in Java syntax) to match in the Subject
-	_replacement_: The value to use for replacing matches in the Subject. If the _regular expression_
-		argument uses Capturing Groups, back references are allowed in the _replacement_.
+*Arguments*:
 
-*Return Type*: String
+	- [.argName]#_Regex_# : [.argDesc]#he Regular Expression (in Java syntax) to match in the Subject#
+	- [.argName]#_Replacement_# : [.argDesc]#The value to use for replacing matches in the Subject. If the _regular expression_
+		argument uses Capturing Groups, back references are allowed in the _replacement_.#
+
+*Return Type*: [.returnType]#String#
 
 *Examples*: If the "filename" attribute has the value "a brand new filename.txt", then the following
 Expressions will provide the following results:
 
 
 
-.ReplaceAll Examples
+.replaceAll Examples
 |=======================================================================================
 | Expression | Value
 | `${filename:replaceAll('\..*', '')}` | `a brand new filename`
@@ -770,16 +838,19 @@ Expressions will provide the following results:
 
 
 
-=== *replaceNull*
+[.function]
+=== replaceNull
+
+*Description*: [.description]#The `replaceNull` function returns the argument if the Subject is null. Otherwise,
+	returns the Subject.#
 
-*Description*: The `replaceNull` function returns the argument if the Subject is null. Otherwise,
-	returns the Subject.
+*Subject Type*: [.subject]#Any#
 
-*Subject Type*: Any
+*Arguments*:
 
-*Arguments*: 1: Any Type
+	- [.argName]#_Replacement_# : [.argDesc]#The value to return if the Subject is null.#
 
-*Return Type*: Type of Subject if Subject is not null; else, type of Argument
+*Return Type*: [.returnType]#Type of Subject if Subject is not null; else, type of Argument#
 
 *Examples*: If the attribute "filename" has the value "a brand new filename.txt" and the attribute
 	"hello" does not exist, then the Expression `${filename:replaceNull('abc')}` will return 
@@ -789,15 +860,16 @@ Expressions will provide the following results:
 
 
 
-=== *length*
+[.function]
+=== length
 
-*Description*: Returns the length of the Subject
+*Description*: [.description]#Returns the length of the Subject#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
 *Arguments*: No arguments
 
-*Return Type*: Number
+*Return Type*: [.returnType]#Number#
 
 *Examples*: If the attribute "filename" has a value of "a brand new filename.txt" and the attribute
 	"hello" does not exist, then the Expression `${filename:length()}` will return 24. `${hello:length()}`
@@ -816,16 +888,20 @@ Expressions will provide the following results:
 Each of the following functions is used to search its subject for some value.
 
 
-=== *startsWith*
+[.function]
+=== startsWith
 
-*Description*: Returns `true` if the Subject starts with the String provided as the argument,
-	`false` otherwise.
+*Description*: [.description]#Returns `true` if the Subject starts with the String provided as the argument,
+	`false` otherwise.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
-*Arguments*: 1: String
+*Arguments*:
 
-*Return Type*: Boolean
+	- [.argName]#_value_# : [.argDesc]#The value to search for#
+
+
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: If the "filename" attribute has the value "a brand new filename.txt", then the Expression
 	`${filename:startsWith('a brand')}` will return `true`. `${filename:startsWith('A BRAND')}` will
@@ -835,16 +911,19 @@ Each of the following functions is used to search its subject for some value.
 
 
 
-=== *endsWith*
+[.function]
+=== endsWith
+
+*Description*: [.description]#Returns `true` if the Subject ends with the String provided as the argument,
+	`false` otherwise.#
 
-*Description*: Returns `true` if the Subject ends with the String provided as the argument,
-	`false` otherwise.
+*Subject Type*: [.subject]#String#
 
-*Subject Type*: String
+*Arguments*:
 
-*Arguments*: 1: String
+	- [.argName]#_value_# : [.argDesc]#The value to search for#
 
-*Return Type*: Boolean
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: If the "filename" attribute has the value "a brand new filename.txt", then the Expression
 	`${filename:endsWith('txt')}` will return `true`. `${filename:endsWith('TXT')}` will
@@ -854,16 +933,18 @@ Each of the following functions is used to search its subject for some value.
 
 
 
-=== *contains*
+[.function]
+=== contains
 
-*Description*: Returns `true` if the Subject contains the value of the argument anywhere in the
-	value.
+*Description*: [.description]#Returns `true` if the Subject contains the value of the argument anywhere in the value.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
-*Arguments*: 1: String
+*Arguments*:
 
-*Return Type*: Boolean
+	- [.argName]#_value_# : [.argDesc]#The value to search for#
+
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: If the "filename" attribute has the value "a brand new filename.txt", then the Expression
 	`${filename:contains('new')}` will return `true`. `${filename:contains('NEW')}` will
@@ -873,16 +954,19 @@ Each of the following functions is used to search its subject for some value.
 
 
 
-=== *find*
+[.function]
+=== find
+
+*Description*: [.description]#Returns `true` if the Subject contains any sequence of characters that matches the
+	Regular Expression provided by the argument.#
 
-*Description*: Returns `true` if the Subject contains any sequence of characters that matches the
-	Regular Expression provided by the argument.
+*Subject Type*: [.subject]#String#
 
-*Subject Type*: String
+*Arguments*:
 
-*Arguments*: 1: String: a Regular Expression (in the Java Pattern syntax) to search for in the Subject.
+	- [.argName]#_Regex_# : [.argDesc]#The Regular Expression (in the Java Pattern syntax) to match against the Subject#
 
-*Return Type*: Boolean
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: 
 
@@ -902,15 +986,18 @@ Expressions will provide the following results:
 
 
 
-=== *matches*
+[.function]
+=== matches
 
-*Description*: Returns `true` if the Subject exactly matches the Regular Expression provided by the argument.
+*Description*: [.description]#Returns `true` if the Subject exactly matches the Regular Expression provided by the argument.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
+
+*Arguments*: 
 
-*Arguments*: 1: String: a Regular Expression (in the Java Pattern syntax) to match against the Subject.
+	- [.argName]#_Regex_# : [.argDesc]#The Regular Expression (in the Java Pattern syntax) to match against the Subject#
 
-*Return Type*: Boolean
+*Return Type*: [.returnType]#Boolean#
 
 *Examples*: 
 
@@ -929,19 +1016,22 @@ Expressions will provide the following results:
 
 
 
-=== *indexOf*
+[.function]
+=== indexOf
 
-*Description*: Returns the index of the first character in the Subject that matches the String value provided
+*Description*: [.description]#Returns the index of the first character in the Subject that matches the String value provided
 	as an argument. If the argument is found multiple times within the Subject, the value returned is the
 	starting index of the *first* occurrence.
 	If the argument cannot be found in the Subject, returns `-1`. The index is zero-based. This means that if
-	the search string is found at the beginning of the Subject, the value returned will be `0`, not `1`.
+	the search string is found at the beginning of the Subject, the value returned will be `0`, not `1`.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
-*Arguments*: String
+*Arguments*:
 
-*Return Type*: Number
+	- [.argName]#_value_# : [.argDesc]#The value to search for in the Subject#
+
+*Return Type*: [.returnType]#Number#
 
 *Examples*: If the "filename" attribute has the value "a brand new filename.txt", then the following
 Expressions will provide the following results:
@@ -949,30 +1039,33 @@ Expressions will provide the following results:
 
 
 .indexOf Examples
-|=======================================================================================
+|===============================================
 | Expression | Value
 | `${filename:indexOf('a.*txt')}` | `-1`
 | `${filename:indexOf('.')}` | `20`
 | `${filename:indexOf('a')}` | `0`
 | `${filename:indexOf(' ')}` | `1`
-|=======================================================================================
+|===============================================
 
 
 
 
-=== *lastIndexOf*
+[.function]
+=== lastIndexOf
 
-*Description*: Returns the index of the first character in the Subject that matches the String value provided
+*Description*: [.description]#Returns the index of the first character in the Subject that matches the String value provided
 	as an argument. If the argument is found multiple times within the Subject, the value returned is the
 	starting index of the *last* occurrence.
 	If the argument cannot be found in the Subject, returns `-1`. The index is zero-based. This means that if
-	the search string is found at the beginning of the Subject, the value returned will be `0`, not `1`.
+	the search string is found at the beginning of the Subject, the value returned will be `0`, not `1`.#
+
+*Subject Type*: [.subject]#String#
 
-*Subject Type*: String
+*Arguments*:
 
-*Arguments*: String
+	- [.argName]#_value_# : [.argDesc]#The value to search for in the Subject#
 
-*Return Type*: Number
+*Return Type*: [.returnType]#Number#
 
 *Examples*: If the "filename" attribute has the value "a brand new filename.txt", then the following
 Expressions will provide the following results:
@@ -993,16 +1086,19 @@ Expressions will provide the following results:
 == Mathematical Operations and Numeric Manipulation
 
 
-=== *plus*
+[.function]
+=== plus
 
-*Description*: Adds a numeric value to the Subject. If either the argument or the Subject cannot be
-	coerced into a Number, returns `null`.
+*Description*: [.description]#Adds a numeric value to the Subject. If either the argument or the Subject cannot be
+	coerced into a Number, returns `null`.#
 
-*Subject Type*: Number
+*Subject Type*: [.subject]#Number#
 
-*Arguments*: 1: Number
+*Arguments*:
 
-*Return Type*: Number
+	- [.argName]#_Operand_# : [.argDesc]#The value to add to the Subject#
+
+*Return Type*: [.returnType]#Number#
 
 *Examples*: If the "fileSize" attribute has a value of 100, then the Expression `${fileSize:plus(1000)}`
 	will return the value `1100`.
@@ -1011,15 +1107,18 @@ Expressions will provide the following results:
 
 
 
-=== *minus*
+[.function]
+=== minus
+
+*Description*: [.description]#Subtracts a numeric value from the Subject.#
 
-*Description*: Subtracts a numeric value from the Subject.
+*Subject Type*: [.subject]#Number#
 
-*Subject Type*: Number
+*Arguments*:
 
-*Arguments*: Number
+	- [.argName]#_Operand_# : [.argDesc]#The value to subtract from the Subject#
 
-*Return Type*: Number
+*Return Type*: [.returnType]#Number#
 
 *Examples*: If the "fileSize" attribute has a value of 100, then the Expression `${fileSize:minus(100)}`
 	will return the value `0`.
@@ -1028,15 +1127,18 @@ Expressions will provide the following results:
 
 
 
-=== *multiply*
+[.function]
+=== multiply
 
-*Description*: Multiplies a numeric value by the Subject and returns the product.
+*Description*: [.description]#Multiplies a numeric value by the Subject and returns the product.#
 
-*Subject Type*: Number
+*Subject Type*: [.subject]#Number#
 
-*Arguments*: Number
+*Arguments*:
 
-*Return Type*: Number
+	- [.argName]#_Operand_# : [.argDesc]#The value to multiple the Subject by#
+
+*Return Type*: [.returnType]#Number#
 
 *Examples*: If the "fileSize" attribute has a value of 100, then the Expression `${fileSize:multiply(1024)}`
 	will return the value `102400`.
@@ -1044,15 +1146,18 @@ Expressions will provide the following results:
 
 
 
-=== *divide*
+[.function]
+=== divide
+
+*Description*: [.description]#Divides a numeric value by the Subject and returns the result, rounded down to the nearest integer.#
 
-*Description*: Divides a numeric value by the Subject and returns the result, rounded down to the nearest integer.
+*Subject Type*: [.subject]#Number#
 
-*Subject Type*: Number
+*Arguments*:
 
-*Arguments*: Number
+	- [.argName]#_Operand_# : [.argDesc]#The value to add divide the Subject by#
 
-*Return Type*: Number
+*Return Type*: [.returnType]#Number#
 
 *Examples*: If the "fileSize" attribute has a value of 100, then the Expression `${fileSize:divide(12)}`
 	will return the value `8`.
@@ -1060,16 +1165,19 @@ Expressions will provide the following results:
 
 
 
-=== *mod*
+[.function]
+=== mod
 
-*Description*: Performs a modular division of the Subject by the argument. That is, this function will divide
-	the Subject by the value of the argument and return not the quotient but rather the remainder.
+*Description*: [.description]#Performs a modular division of the Subject by the argument. That is, this function will divide
+	the Subject by the value of the argument and return not the quotient but rather the remainder.#
 
-*Subject Type*: Number
+*Subject Type*: [.subject]#Number#
 
-*Arguments*: Number
+*Arguments*:
 
-*Return Type*: Number
+	- [.argName]#_Operand_# : [.argDesc]#The value to divide the Subject by#
+
+*Return Type*: [.returnType]#Number#
 
 *Examples*: If the "fileSize" attribute has a value of 100, then the Expression `${fileSize:mod(12)}`
 	will return the value `4`.
@@ -1078,20 +1186,21 @@ Expressions will provide the following results:
 
 
 
-=== *toRadix*
+[.function]
+=== toRadix
 
-*Description*: Converts the Subject from a Base 10 number to a different Radix (or number base). An optional
+*Description*: [.description]#Converts the Subject from a Base 10 number to a different Radix (or number base). An optional
 	second argument can be used to indicate the minimum number of characters to be used. If the converted value
-	has fewer than this number of characters, the number will be padded with leading zeroes.
+	has fewer than this number of characters, the number will be padded with leading zeroes.#
 
-*Subject Type*: Number
+*Subject Type*: [.subject]#Number#
 
-*Arguments*: 2
+*Arguments*:
 
-	_Desired Base_: A Number between 2 and 36 (inclusive)
-	_Padding_: Optional argument that specifies the minimum number of characters in the converted output
+	- [.argName]#_Desired Base_# : [.argDesc]#A Number between 2 and 36 (inclusive)#
+	- [.argName]#_Padding_# : [.argDesc]#Optional argument that specifies the minimum number of characters in the converted output#
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
 *Examples*: If the "fileSize" attributes has a value of 1024, then the following Expressions will yield
 	the following results:
@@ -1118,17 +1227,20 @@ Expressions will provide the following results:
 
 
 [[format]]
-=== *format*
+[.function]
+=== format
 
-*Description*: Formats a number as a date/time according to the format specified by the argument. The argument
+*Description*: [.description]#Formats a number as a date/time according to the format specified by the argument. The argument
 	must be a String that is a valid Java SimpleDateFormat format. The Subject is expected to be a Number that
-	represents the number of milliseconds since Midnight GMT January 1, 1970.
+	represents the number of milliseconds since Midnight GMT January 1, 1970.#
+
+*Subject Type*: [.subject]#Number#
 
-*Subject Type*: Number
+*Arguments*:
 
-*Arguments*: 1: String: The format to output the date in.
+	- [.argName]#_format_# : [.argDesc]#The format to use in the Java SimpleDateFormat syntax#
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
 *Examples*: If the attribute "time" has the value "1420058163264", then the following Expressions will yield
 	the following results:
@@ -1146,18 +1258,22 @@ Expressions will provide the following results:
 
 
 
-=== *toDate*
+[.function]
+=== toDate
 
-*Description*: Converts a String into a Number, based on the format specified by the argument. The argument
-	must be a String that is a valid Java SimpleDateFormat format. The Subject is expected to be a String
+*Description*: [.description]#Converts a String into a Number, based on the format specified by the argument. The argument
+	must be a String that is a valid Java SimpleDateFormat syntax. The Subject is expected to be a String
 	that is formatted according the argument. The return value is the numbr of milliseconds since 
-	Midnight GMT January 1, 1979.
+	Midnight GMT January 1, 1979.#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
-*Arguments*: String
+*Arguments*:
+	
+		- [.argName]#_format_# : [.argDesc]#The current format to use when parsing the Subject, in the Java SimpleDateFormat syntax.#
 
-*Return Type*: Number
+
+*Return Type*: [.returnType]#Number#
 
 *Examples*: If the attribute "year" has the value "2014" and the attribute "time" has the value "2014/12/31 15:36:03.264Z",
 	then the Expression `${year:toDate('yyyy')}` will return the number of milliseconds between Midnight GMT on January 1, 1970
@@ -1171,16 +1287,17 @@ Expressions will provide the following results:
 
 
 
-=== *now*
+[.function]
+=== now
 
-*Description*: The `now` function returns the current date and time as the number of milliseconds since Midnight GMT on
-	January 1, 1970.
+*Description*: [.description]#The `now` function returns the current date and time as the number of milliseconds since Midnight GMT on
+	January 1, 1970.#
 
-*Subject Type*: No Subject
+*Subject Type*: [.subject]#No Subject#
 
 *Arguments*: No arguments
 
-*Return Type*: Number
+*Return Type*: [.returnType]#Number#
 
 *Examples*: We can format the current date and time by using the `now` function in conjunction with the <<format>> function:
 	`${now():format('yyyy/MM/dd HH:mm:ss')}`.
@@ -1192,15 +1309,16 @@ Expressions will provide the following results:
 [[type_cast]]
 == Type Coercion
 
-=== *toString*
+[.function]
+=== toString
 
-*Description*: Coerces the Subject into a String
+*Description*: [.description]#Coerces the Subject into a String#
 
-*Subject Type*: Any type
+*Subject Type*: [.subject]#Any type#
 
 *Arguments*: No arguments
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
 *Examples*: The Expression `${fileSize:toNumber():toString()}` converts the value of "fileSize" attribute to a number and
 	back to a String.
@@ -1209,15 +1327,16 @@ Expressions will provide the following results:
 
 
 
-=== *toNumber*
+[.function]
+=== toNumber
 
-*Description*: Coerces the Subject into a Number
+*Description*: [.description]#Coerces the Subject into a Number#
 
-*Subject Type*: String
+*Subject Type*: [.subject]#String#
 
 *Arguments*: No arguments
 
-*Return Type*: Number
+*Return Type*: [.returnType]#Number#
 
 *Examples*: The Expression `${fileSize:toNumber()}` converts the String attribute value of "fileSize" to a number.
 
@@ -1229,69 +1348,89 @@ Expressions will provide the following results:
 [[subjectless]]
 == Subjectless Functions
 
-=== *ip*
+While the majority of functions in the Expression Language are called by using the syntax
+`${attributeName:function()}`, there exist a few functions that are not expected to have subjects.
+In this case, the attribute name is not present. For example, the IP address of the machine can
+be obtained by using the Expression `${ip()}`. All of the functions in this section are to be called
+without a subject. Attempting to call a subjectless function and provide it a subject will result in
+an error when validating the function.
+
 
-*Description*: Returns the IP address of the machine.
+[.function]
+=== ip
 
-*Subject Type*: No subject
+*Description*: [.description]#Returns the IP address of the machine.#
+
+*Subject Type*: [.subjectless]#No subject#
 
 *Arguments*: No arguments
 
-*Return Type*: String
+*Return Type*: [.returnType]#String#
 
-*Examples*: ${ip()}
+*Examples*: The IP address of the machine can be obtained by using the Expresison `${ip()}`.
 
 
 
 
 
-=== *hostname*
+[.function]
+=== hostname
 
-*Description*: Returns the Hostname of the machine. An optional argument of type Boolean can be provided
+*Description*: [.description]#Returns the Hostname of the machine. An optional argument of type Boolean can be provided
 	to specify whether or not the Fully Qualified Domain Name should be used. If `false`, or not specified,
 	the hostname will not be fully qualified. If the argument is `true` but the fully qualified hostname
-	cannot be resolved, the simple hostname will be returned.
+	cannot be resolved, the simple hostname will be returned.#
 
-*Subject Type*: No subject
+*Subject Type*: [.subjectless]#No subject#
 
-*Arguments*: 1: Boolean: Optionally specify whether or not the hostname to return should be fully qualified,
-	if not specified, defaults to `false`.
+*Arguments*:
 
-*Return Type*: String
+	- [.argName]#_Fully Qualified_# : [.argDesc]#Optional parameter that specifies whether or not the hostname should be
+		fully qualified. If not specified, defaults to false.#
 
-*Examples*: ${hostname(true)}
+*Return Type*: [.returnType]#String#
 
+*Examples*: The fully qualified hostname of the machine can be obtained by using the Expression `${hostname(true)}`,
+	while the simple hostname can be obtained by using either `${hostname(false)}` or simply `${hostname()}`.
 
 
 
 
-=== *UUID*
 
-*Description*: 
+[.function]
+=== UUID
 
-*Subject Type*: 
+*Description*: [.description]#Returns a randomly generated UUID.#
 
-*Arguments*: 
+*Subject Type*: [.subjectless]#No Subject#
 
-*Return Type*: 
+*Arguments*: No arguments
 
-*Examples*: 
+*Return Type*: [.returnType]#String#
 
+*Examples*: ${UUID()} returns a value similar to de305d54-75b4-431b-adb2-eb6b9e546013
 
 
 
 
-=== *nextInt*
 
-*Description*: 
+[.function]
+=== nextInt
 
-*Subject Type*: 
+*Description*: [.description]#Returns a one-up value (starting at 0) and increasing over the lifetime of the running instance of NiFi. 
+	This value is not persisted across restarts and is not guaranteed to be unique across a cluster. 
+	This value is considered "one-up" in that if called multiple times across the NiFi instance, the values will be sequential. 
+	However, this counter is shared across all NiFi components, so calling this function multiple times from one Processor will 
+	not guarantee sequential values within the context of a particular Processor.#
 
-*Arguments*: 
+*Subject Type*: [.subjectless]#No Subject#
 
-*Return Type*: 
+*Arguments*: No arguments
 
-*Examples*: 
+*Return Type*: [.returnType]#Number#
+
+*Examples*: If the previous value returned by `nextInt` was `5`, the Expression `${nextInt():divide(2)}` obtains the next available 
+	integer (6) and divides the result by 2, returning a value of `3`.
 
 
 
@@ -1301,101 +1440,251 @@ Expressions will provide the following results:
 [[multi]]
 == Evaluating Multiple Attributes
 
-=== *anyAttribute*
+When it becomes necessary to evaluate the same conditions against multiple attributes, this can be accomplished by means of the 
+`and` and `or` functions. However, this quickly becomes tedious, error-prone, and difficult to maintain. For this reason, NiFi
+provides several functions for evaluating the same conditions against groups of attributes at the same time.
 
-*Description*: 
 
-*Subject Type*: 
 
-*Arguments*: 
 
-*Return Type*: 
+[.function]
+=== anyAttribute
+  
+*Description*: [.description]#Checks to see if any of the given attributes, match the given condition. This function has no subject and takes one or more
+	arguments that are the names of attributes to which the remainder of the Expression is to be applied. If any of the attributes specified,
+	when evaluated against the rest of the Expression, returns a value of `true`, then this function will return `true`. Otherwise, this function
+	will return `false`.#
 
-*Examples*: 
+*Subject Type*: [.subjectless]#No Subject#
 
+*Arguments*:
 
+	- [.argName]#_Attribute Names_# : [.argDesc]#One or more attribute names to evaluate#
 
 
+*Return Type*: [.returnType]#Boolean#
 
-=== *allAttributes*
+*Examples*: Given that the "abc" attribute contains the value "hello world", "xyz" contains "good bye world", 
+	and "filename" contains "file.txt" consider the following examples:
 
-*Description*: 
+.anyAttribute Examples
+|=======================================================================
+| Expression | Value
+| `${anyAttribute("abc", "xyz"):contains("bye")}` | `true`
+| `${anyAttribute("filename","xyz"):toUpper():contains("e")}` | `false`
+|=======================================================================
+
+
+
+
+[.function]
+=== allAttributes
+
+*Description*: [.description]#Checks to see if any of the given attributes, match the given condition. This function has no subject and takes one or more
+	arguments that are the names of attributes to which the remainder of the Expression is to be applied. If all of the attributes specified,
+	when evaluated against the rest of the Expression, returns a value of `true`, then this function will return `true`. Otherwise, this function
+	will return `false`.#
 
-*Subject Type*: 
+*Subject Type*: [.subjectless]#No Subject#
 
 *Arguments*: 
 
-*Return Type*: 
+	- [.argName]#_Attribute Names_# : [.argDesc]#One or more attribute names to evaluate#
 
-*Examples*: 
+*Return Type*: [.returnType]#Boolean#
 
+*Examples*: Given that the "abc" attribute contains the value "hello world", "xyz" contains "good bye world", 
+	and "filename" contains "file.txt" consider the following examples:
 
+.allAttributes Example
+|=============================================================================
+| Expression | Value
+| `${allAttributes("abc", "xyz"):contains("world")}` | `true`
+| `${allAttributes("abc", "filename","xyz"):toUpper():contains("e")}` | `false`
+|=============================================================================
 
 
 
-=== *anyMatchingAttribute*
 
-*Description*: 
 
-*Subject Type*: 
+[.function]
+=== anyMatchingAttribute
 
-*Arguments*: 
+*Description*: [.description]# Checks to see if any of the given attributes, match the given condition. This function has no subject and takes one or more
+	arguments that are Regular Expressions to match against attribute names. Any attribute whose name matches one of the supplied
+	Regular Expressions will be evaluated against the rest of the Expression. If any of the attributes specified,
+	when evaluated against the rest of the Expression, returns a value of `true`, then this function will return `true`. Otherwise, this function
+	will return `false`.#
 
-*Return Type*: 
+*Subject Type*: [.subjectless]#No Subject#
 
-*Examples*: 
+*Arguments*:
 
+	- [.argName]#_Regex_# : [.argDesc]#One or more Regular Expressions (in the Java Pattern syntax) to evaluate against attribute names#
 
 
+*Return Type*: [.returnType]#Boolean#
 
+*Examples*: Given that the "abc" attribute contains the value "hello world", "xyz" contains "good bye world", 
+	and "filename" contains "file.txt" consider the following examples:
 
-=== *allMatchingAttributes*
+.anyMatchingAttribute Example
+|==============================================================
+| Expression | Value
+| `${anyMatchingAttribute("[ax].*"):contains('bye')}` | `true`
+| `${anyMatchingAttribute(".*"):isNull()}` | `false`
+|==============================================================
 
-*Description*: 
 
-*Subject Type*: 
 
-*Arguments*: 
 
-*Return Type*: 
 
-*Examples*: 
+[.function]
+=== allMatchingAttributes
 
+*Description*: [.description]# Checks to see if any of the given attributes, match the given condition. This function has no subject and takes one or more
+	arguments that are Regular Expressions to match against attribute names. Any attribute whose name matches one of the supplied
+	Regular Expressions will be evaluated against the rest of the Expression. If all of the attributes specified,
+	when evaluated against the rest of the Expression, return a value of `true`, then this function will return `true`. Otherwise, this function
+	will return `false`.#
 
+*Subject Type*: [.subjectless]#No Subject#
 
+	- [.argName]#_Regex_# : [.argDesc]#One or more Regular Expressions (in the Java Pattern syntax) to evaluate against attribute names#
 
+*Return Type*: [.returnType]#Boolean#
 
-=== *anyDelineatedValue*
+*Examples*: Given that the "abc" attribute contains the value "hello world", "xyz" contains "good bye world", 
+	and "filename" contains "file.txt" consider the following examples:
 
-*Description*: 
+.anyMatchingAttributes Examples
+|==============================================================
+| Expression | Value
+| `${allMatchingAttributes("[ax].*"):contains("world")}` | `true`
+| `${allMatchingAttributes(".*"):isNull()}` | `false`
+| `${allMatchingAttributes("f.*"):count()}` | `1`
+|==============================================================
 
-*Subject Type*: 
 
-*Arguments*: 
 
-*Return Type*: 
 
-*Examples*: 
 
+[.function]
+=== anyDelineatedValue
 
+*Description*: [.description]#Splits a String apart according to a delimiter that is provided, and then evaluates each of the values against
+	the rest of the Expression. If the Expression, when evaluated against any of the individual values, returns `true`, this
+	function returns `true`. Otherwise, the function returns `false`.#
 
+*Subject Type*: [.subjectless]#No Subject#
 
+*Arguments*:
 
-=== *allDelineatedValues*
+	- [.argName]#_Delineated Value_# : [.argDesc]#The value that is delineated. This is generally an embedded Expression, 
+		though it does not have to be.#
+	- [.argName]#_Delimiter_# : [.argDesc]#The value to use to split apart the _delineatedValue_ argument.#
 
-*Description*: 
+*Return Type*: [.returnType]#Boolean#
 
-*Subject Type*: 
+*Examples*: Given that the "number_list" attribute contains the value "1,2,3,4,5", and the "word_list" attribute contains the value "the,and,or,not", 
+	consider the following examples:
 
-*Arguments*: 
+.anyDelineatedValue Examples
+|===============================================================================
+| Expression | Value
+| `${anyDelineatedValue("${number_list}", ","):contains("5")}` | `true`
+| `${anyDelineatedValue("this that and", ","):equals("${word_list}")}` | `false`
+|===============================================================================
 
-*Return Type*: 
 
-*Examples*: 
 
+[.function]
+=== allDelineatedValues
+
+*Description*: [.description]#Splits a String apart according to a delimiter that is provided, and then evaluates each of the values against
+	the rest of the Expression. If the Expression, when evaluated against all of the individual values, returns `true` in each
+	case, then this function returns `true`. Otherwise, the function returns `false`.#
 
+*Subject Type*: [.subjectless]#No Subject#
 
+*Arguments*:
 
+	- [.argName]#_Delineated Value_# : [.argDesc]#The value that is delineated. This is generally 
+		an embedded Expression, though it does not have to be.#
 
+	- [.argName]#_Delimiter_# : [.argDesc]#The value to use to split apart the _delineatedValue_ argument.#
 
+*Return Type*: [.returnType]#Boolean#
+
+*Examples*: Given that the "number_list" attribute contains the value "1,2,3,4,5", and the "word_list" attribute contains the value "those,known,or,not", 
+	consider the following examples:
+
+.allDelineatedValues Examples
+|===============================================================================
+| Expression | Value
+| `${allDelineatedValues("${word_list}", ","):contains("o")}` | `true`
+| `${allDelineatedValues("${number_list}", ","):count()}` | `4`
+| `${allDelineatedValues("${number_list}", ","):matches("[0-9]+")}` | `true`
+| `${allDelineatedValues("${word_list}", ","):matches('e')}` | `false`
+|===============================================================================
+
+
+
+
+[.function]
+=== join
+
+*Description*: [.description]#Aggregate function that concatenates multiple values with the specified delimiter. This function 
+	may be used only in conjunction with the `allAttributes`, `allMatchingAttributes`, and `allDelineatedValues`
+	functions.#
+
+*Subject Type*: [.subject]#String#
+
+*Arguments*:
+
+	- [.argName]#_Delimiter_# : [.argDesc]#The String delimiter to use when joining values#
+
+*Return Type*: [.returnType]#String#
+
+*Examples*: Given that the "abc" attribute contains the value "hello world", "xyz" contains "good bye world", 
+	and "filename" contains "file.txt" consider the following examples:
+
+.join Examples
+|=======================================================================================
+| Expression | Value
+| `${allMatchingAttributes("[ax].*"):substringBefore(" "):join("-")}` | `hello-good`
+| `${allAttributes("abc", "xyz"):join(" now")}` | `hello world nowgood bye world now`
+|=======================================================================================
+
+
+
+
+
+
+[.function]
+=== count
+
+*Description*: [.description]#Aggregate function that counts the number of non-null, non-false values returned by the 
+	`allAttributes`, `allMatchingAttributes`, and `allDelineatedValues`. This function 
+	may be used only in conjunction with the `allAttributes`, `allMatchingAttributes`, and `allDelineatedValues`
+	functions.#
+
+*Subject Type*: [.subject]#Any#
+
+*Arguments*: No arguments
+
+*Return Type*: [.returnType]#Number#
+
+*Examples*: Given that the "abc" attribute contains the value "hello world", "xyz" contains "good bye world", 
+	and "number_list" contains "1,2,3,4,5" consider the following examples:
+
+.count Examples
+|===========================================================================
+| Expression | Value
+| `${allMatchingAttributes("[ax].*"):substringBefore(" "):count()}` | `2`
+| `${allAttributes("abc", "xyz"):contains("world"):count()}` | `1`
+| `${allDelineatedValues(${number_list}, ","):count()}` | `5`
+| `${allAttributes("abc", "non-existent-attr", "xyz"):count()}` | `2`
+| `${allMatchingAttributes(".*"):length():gt(10):count()}` | `2`
+|===========================================================================
 


[5/7] incubator-nifi git commit: NIFI-220: add kafka bundle to root pom

Posted by ma...@apache.org.
NIFI-220: add kafka bundle to root pom


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/b1e5c5e6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/b1e5c5e6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/b1e5c5e6

Branch: refs/heads/develop
Commit: b1e5c5e66a9d07ffefed1687e5054c634e14f68e
Parents: 090ad38
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jan 5 20:33:22 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Jan 5 20:33:22 2015 -0500

----------------------------------------------------------------------
 nar-bundles/pom.xml | 13 ++++++++++++-
 1 file changed, 12 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/b1e5c5e6/nar-bundles/pom.xml
----------------------------------------------------------------------
diff --git a/nar-bundles/pom.xml b/nar-bundles/pom.xml
index 4985cdc..7c79a48 100644
--- a/nar-bundles/pom.xml
+++ b/nar-bundles/pom.xml
@@ -40,6 +40,7 @@
         <module>standard-services</module>
         <module>update-attribute-bundle</module>
         <module>volatile-provenance-repository-bundle</module>
+		<module>kafka-bundle</module>
     </modules>
     <dependencyManagement>
         <dependencies>
@@ -86,7 +87,17 @@
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>ssl-context-service</artifactId>
                 <version>${project.version}</version>
-            </dependency>                      
+            </dependency>
+            <dependency>
+            	<groupId>org.apache.nifi</groupId>
+            	<artifactId>http-context-map-api</artifactId>
+            	<version>${project.version}</version>
+            </dependency>
+            <dependency>
+            	<groupId>org.apache.nifi</groupId>
+            	<artifactId>http-context-map</artifactId>
+            	<version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.nifi</groupId>
                 <artifactId>volatile-provenance-repository</artifactId>


[2/7] incubator-nifi git commit: NIFI-220: Allow for demarcator to be specified for Kafka Get and Put and added unit tests; updated docs

Posted by ma...@apache.org.
NIFI-220: Allow for demarcator to be specified for Kafka Get and Put and added unit tests; updated docs


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c91c7e78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c91c7e78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c91c7e78

Branch: refs/heads/develop
Commit: c91c7e78970a5261b05572dfbad02ee91287bf5e
Parents: 3e2f790
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Jan 4 21:10:34 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Jan 4 21:10:34 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/GetKafka.java  | 127 +++++---
 .../apache/nifi/processors/kafka/PutKafka.java  | 287 +++++++++++++++----
 .../index.html                                  |  40 ++-
 .../index.html                                  |  56 ++--
 .../nifi/processors/kafka/TestGetKafka.java     | 109 ++++++-
 .../nifi/processors/kafka/TestPutKafka.java     | 174 ++++++++++-
 6 files changed, 662 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c91c7e78/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 55c67e3..ea4296e 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -24,6 +24,7 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -81,6 +82,26 @@ public class GetKafka extends AbstractProcessor {
 	    .expressionLanguageSupported(false)
 	    .defaultValue("30 secs")
 	    .build();
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+        .name("Batch Size")
+        .description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be "
+                + "concatenated together with the <Message Demarcator> string placed between the content of each message. "
+                + "If the messages from Kafka should not be concatenated together, leave this value at 1.")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .defaultValue("1")
+        .build();
+    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+        .name("Message Demarcator")
+        .description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the <Batch Size> "
+                + "property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, "
+                + "this value will be placed in between them.")
+        .required(true)
+        .addValidator(Validator.VALID)  // accept anything as a demarcator, including empty string
+        .expressionLanguageSupported(false)
+        .defaultValue("\\n")
+        .build();
     public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
         .name("Client Name")
         .description("Client Name to use when communicating with Kafka")
@@ -113,6 +134,8 @@ public class GetKafka extends AbstractProcessor {
         props.add(ZOOKEEPER_CONNECTION_STRING);
         props.add(TOPIC);
         props.add(ZOOKEEPER_COMMIT_DELAY);
+        props.add(BATCH_SIZE);
+        props.add(MESSAGE_DEMARCATOR);
         props.add(clientNameWithDefault);
         props.add(KAFKA_TIMEOUT);
         props.add(ZOOKEEPER_TIMEOUT);
@@ -181,15 +204,25 @@ public class GetKafka extends AbstractProcessor {
     	}
     }
     
+    protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
+        return streamIterators.poll();
+    }
+    
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-    	ConsumerIterator<byte[], byte[]> iterator = streamIterators.poll();
+    	ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
     	if ( iterator == null ) {
     		return;
     	}
     	
+    	final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+    	final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+    	final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8);
+    	final String topic = context.getProperty(TOPIC).getValue();
+    	
     	FlowFile flowFile = null;
     	try {
+    	    // add the current thread to the Set of those to be interrupted if processor stopped.
     		interruptionLock.lock();
     		try {
     			interruptableThreads.add(Thread.currentThread());
@@ -197,52 +230,73 @@ public class GetKafka extends AbstractProcessor {
     			interruptionLock.unlock();
     		}
     		
-    		try {
-	    		if (!iterator.hasNext() ) {
-	    			return;
-	    		}
-    		} catch (final Exception e) {
-    			getLogger().warn("Failed to invoke hasNext() due to ", new Object[] {e});
-    			iterator = null;
-    			return;
-    		}
-    		
     		final long start = System.nanoTime();
-    		final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
-    		
-    		if ( mam == null ) {
-    			return;
-    		}
-    		
-    		final byte[] key = mam.key();
+    		flowFile = session.create();
     		
     		final Map<String, String> attributes = new HashMap<>();
-    		if ( key != null ) {
-    			attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
+            attributes.put("kafka.topic", topic);
+
+            int numMessages = 0;
+    		for (int msgCount = 0; msgCount < batchSize; msgCount++) {
+    		    // if the processor is stopped, iterator.hasNext() will throw an Exception.
+    		    // In this case, we just break out of the loop.
+    		    try {
+        		    if ( !iterator.hasNext() ) {
+        		        break;
+        		    }
+    		    } catch (final Exception e) {
+    		        break;
+    		    }
+    		    
+        		final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
+        		if ( mam == null ) {
+        			return;
+        		}
+        		
+        		final byte[] key = mam.key();
+        		
+        		if ( batchSize == 1 ) {
+        		    // the kafka.key, kafka.offset, and kafka.partition attributes are added only
+        		    // for a batch size of 1.
+        		    if ( key != null ) {
+        		        attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
+        		    }
+        		    
+            		attributes.put("kafka.offset", String.valueOf(mam.offset()));
+            		attributes.put("kafka.partition", String.valueOf(mam.partition()));
+        		}
+        		
+        		// add the message to the FlowFile's contents
+        		final boolean firstMessage = (msgCount == 0);
+        		flowFile = session.append(flowFile, new OutputStreamCallback() {
+    				@Override
+    				public void process(final OutputStream out) throws IOException {
+    				    if ( !firstMessage ) {
+    				        out.write(demarcatorBytes);
+    				    }
+    					out.write(mam.message());
+    				}
+        		});
+        		numMessages++;
     		}
-    		attributes.put("kafka.offset", String.valueOf(mam.offset()));
-    		attributes.put("kafka.partition", String.valueOf(mam.partition()));
-    		attributes.put("kafka.topic", mam.topic());
     		
-    		flowFile = session.create();
-    		flowFile = session.write(flowFile, new OutputStreamCallback() {
-				@Override
-				public void process(final OutputStream out) throws IOException {
-					out.write(mam.message());
-				}
-    		});
-    		
-    		flowFile = session.putAllAttributes(flowFile, attributes);
-    		final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-    		session.getProvenanceReporter().receive(flowFile, "kafka://" + mam.topic() + "/partitions/" + mam.partition() + "/offsets/" + mam.offset(), millis);
-    		getLogger().info("Successfully received {} from Kafka in {} millis", new Object[] {flowFile, millis});
-    		session.transfer(flowFile, REL_SUCCESS);
+    		// If we received no messages, remove the FlowFile. Otherwise, send to success.
+    		if ( flowFile.getSize() == 0L ) {
+    		    session.remove(flowFile);
+    		} else {
+        		flowFile = session.putAllAttributes(flowFile, attributes);
+        		final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        		session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, millis);
+        		getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] {flowFile, numMessages, millis});
+        		session.transfer(flowFile, REL_SUCCESS);
+    		}
     	} catch (final Exception e) {
     		getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e});
     		if ( flowFile != null ) {
     			session.remove(flowFile);
     		}
     	} finally {
+    	    // Remove the current thread from the Set of Threads to interrupt.
     		interruptionLock.lock();
     		try {
     			interruptableThreads.remove(Thread.currentThread());
@@ -250,6 +304,7 @@ public class GetKafka extends AbstractProcessor {
     			interruptionLock.unlock();
     		}
     		
+    		// Add the iterator back to the queue
     		if ( iterator != null ) {
     			streamIterators.offer(iterator);
     		}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c91c7e78/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 5e5940c..4b5a742 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -48,7 +48,14 @@ import org.apache.nifi.processor.annotation.Tags;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
+import org.apache.nifi.util.LongHolder;
+
+import scala.actors.threadpool.Arrays;
 
 @SupportsBatching
 @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
@@ -90,6 +97,24 @@ public class PutKafka extends AbstractProcessor {
 		.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
 		.defaultValue(DELIVERY_BEST_EFFORT.getValue())
 		.build();
+    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
+            .name("Message Delimiter")
+            .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+                    + "If not specified, the entire content of the FlowFile will be used as a single message. "
+                    + "If specified, the contents of the FlowFile will be split on this delimiter and each section "
+                    + "sent as a separate Kafka message.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
+        .name("Max Buffer Size")
+        .description("The maximum amount of data to buffer in memory before sending to Kafka")
+        .required(true)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .defaultValue("1 MB")
+        .build();
     public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
 	    .name("Communications Timeout")
 	    .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
@@ -98,14 +123,6 @@ public class PutKafka extends AbstractProcessor {
 	    .expressionLanguageSupported(false)
 	    .defaultValue("30 secs")
 	    .build();
-    public static final PropertyDescriptor MAX_FLOWFILE_SIZE = new PropertyDescriptor.Builder()
-		.name("Max FlowFile Size")
-		.description("Specifies the amount of data that can be buffered to send to Kafka. If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'. This helps to prevent the system from running out of memory")
-		.required(true)
-		.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-		.expressionLanguageSupported(false)
-		.defaultValue("1 MB")
-		.build();
     public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
 	    .name("Client Name")
 	    .description("Client Name to use when communicating with Kafka")
@@ -123,10 +140,6 @@ public class PutKafka extends AbstractProcessor {
 	    .name("failure")
 	    .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
 	    .build();
-    public static final Relationship REL_REJECT = new Relationship.Builder()
-	    .name("reject")
-	    .description("Any FlowFile whose size exceeds the <Max FlowFile Size> property will be routed to this Relationship")
-	    .build();
 
     private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>();
     
@@ -142,8 +155,9 @@ public class PutKafka extends AbstractProcessor {
         props.add(TOPIC);
         props.add(KEY);
         props.add(DELIVERY_GUARANTEE);
+        props.add(MESSAGE_DELIMITER);
+        props.add(MAX_BUFFER_SIZE);
         props.add(TIMEOUT);
-        props.add(MAX_FLOWFILE_SIZE);
         props.add(clientName);
         return props;
     }
@@ -153,7 +167,6 @@ public class PutKafka extends AbstractProcessor {
         final Set<Relationship> relationships = new HashSet<>(1);
         relationships.add(REL_SUCCESS);
         relationships.add(REL_FAILURE);
-        relationships.add(REL_REJECT);
         return relationships;
     }
     
@@ -167,11 +180,10 @@ public class PutKafka extends AbstractProcessor {
     	}
     }
     
-    
-    private Producer<byte[], byte[]> createProducer(final ProcessContext context) {
-    	final String brokers = context.getProperty(SEED_BROKERS).getValue();
+    protected ProducerConfig createConfig(final ProcessContext context) {
+        final String brokers = context.getProperty(SEED_BROKERS).getValue();
 
-    	final Properties properties = new Properties();
+        final Properties properties = new Properties();
         properties.setProperty("metadata.broker.list", brokers);
         properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
         properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
@@ -180,8 +192,11 @@ public class PutKafka extends AbstractProcessor {
         properties.setProperty("message.send.max.retries", "1");
         properties.setProperty("producer.type", "sync");
         
-        final ProducerConfig config = new ProducerConfig(properties);
-        return new Producer<>(config);
+        return new ProducerConfig(properties);
+    }
+    
+    protected Producer<byte[], byte[]> createProducer(final ProcessContext context) {
+    	return new Producer<>(createConfig(context));
     }
     
     private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) {
@@ -201,52 +216,200 @@ public class PutKafka extends AbstractProcessor {
     	}
     	
     	final long start = System.nanoTime();
-    	final long maxSize = context.getProperty(MAX_FLOWFILE_SIZE).asDataSize(DataUnit.B).longValue();
-    	if ( flowFile.getSize() > maxSize ) {
-    		getLogger().info("Routing {} to 'reject' because its size exceeds the configured maximum allowed size", new Object[] {flowFile});
-    		session.getProvenanceReporter().route(flowFile, REL_REJECT, "FlowFile is larger than " + maxSize);
-    		session.transfer(flowFile, REL_REJECT);
-    		return;
-    	}
-    	
         final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
         final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final byte[] keyBytes = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
+        String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
+        if ( delimiter != null ) {
+            delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+        }
         
-        final byte[] value = new byte[(int) flowFile.getSize()];
-        session.read(flowFile, new InputStreamCallback() {
-			@Override
-			public void process(final InputStream in) throws IOException {
-				StreamUtils.fillBuffer(in, value);
-			}
-        });
-        
+        final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
         final Producer<byte[], byte[]> producer = borrowProducer(context);
-        boolean error = false;
-        try {
-        	final KeyedMessage<byte[], byte[]> message;
-        	if ( key == null ) {
-        		message = new KeyedMessage<>(topic, value);
-        	} else {
-        		message = new KeyedMessage<>(topic, key.getBytes(StandardCharsets.UTF_8), value);
-        	}
-        	
-        	producer.send(message);
-        	final long nanos = System.nanoTime() - start;
-        	
-        	session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
-        	session.transfer(flowFile, REL_SUCCESS);
-        	getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
-        } catch (final Exception e) {
-        	getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e});
-        	session.transfer(flowFile, REL_FAILURE);
-        	error = true;
-        } finally {
-        	if ( error ) {
-        		producer.close();
-        	} else {
-        		returnProducer(producer);
-        	}
+        
+        if ( delimiter == null ) {
+            // Send the entire FlowFile as a single message.
+            final byte[] value = new byte[(int) flowFile.getSize()];
+            session.read(flowFile, new InputStreamCallback() {
+    			@Override
+    			public void process(final InputStream in) throws IOException {
+    				StreamUtils.fillBuffer(in, value);
+    			}
+            });
+            
+            boolean error = false;
+            try {
+                final KeyedMessage<byte[], byte[]> message;
+                if ( key == null ) {
+                    message = new KeyedMessage<>(topic, value);
+                } else {
+                    message = new KeyedMessage<>(topic, keyBytes, value);
+                }
+                
+                producer.send(message);
+                final long nanos = System.nanoTime() - start;
+                
+                session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
+                session.transfer(flowFile, REL_SUCCESS);
+                getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+            } catch (final Exception e) {
+                getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e});
+                session.transfer(flowFile, REL_FAILURE);
+                error = true;
+            } finally {
+                if ( error ) {
+                    producer.close();
+                } else {
+                    returnProducer(producer);
+                }
+            }
+        } else {
+            final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8);
+            
+            // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see
+            // if it matches some pattern. We can use this to search for the delimiter as we read through
+            // the stream of bytes in the FlowFile
+            final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes);
+            
+            boolean error = false;
+            final LongHolder lastMessageOffset = new LongHolder(0L);
+            final LongHolder messagesSent = new LongHolder(0L);
+            
+            try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+                session.read(flowFile, new InputStreamCallback() {
+                    @Override
+                    public void process(final InputStream rawIn) throws IOException {
+                        byte[] data = null; // contents of a single message
+                        
+                        boolean streamFinished = false;
+                        
+                        final List<KeyedMessage<byte[], byte[]>> messages = new ArrayList<>(); // batch to send
+                        long messageBytes = 0L; // size of messages in the 'messages' list
+                        
+                        int nextByte;
+                        try (final InputStream bufferedIn = new BufferedInputStream(rawIn);
+                             final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
+                            
+                            // read until we're out of data.
+                            while (!streamFinished) {
+                                nextByte = in.read();
+
+                                if ( nextByte > -1 ) {
+                                    baos.write(nextByte);
+                                }
+                                
+                                if (nextByte == -1) {
+                                    // we ran out of data. This message is complete.
+                                    data = baos.toByteArray();
+                                    streamFinished = true;
+                                } else if ( buffer.addAndCompare((byte) nextByte) ) {
+                                    // we matched our delimiter. This message is complete. We want all of the bytes from the
+                                    // underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want
+                                    // the delimiter itself to be sent.
+                                    data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length);
+                                }
+                                
+                                createMessage: if ( data != null ) {
+                                    // If the message has no data, ignore it.
+                                    if ( data.length == 0 ) {
+                                        data = null;
+                                        baos.reset();
+                                        break createMessage;
+                                    }
+                                    
+                                    // either we ran out of data or we reached the end of the message. 
+                                    // Either way, create the message because it's ready to send.
+                                    final KeyedMessage<byte[], byte[]> message;
+                                    if ( key == null ) {
+                                        message = new KeyedMessage<>(topic, data);
+                                    } else {
+                                        message = new KeyedMessage<>(topic, keyBytes, data);
+                                    }
+                                    
+                                    // Add the message to the list of messages ready to send. If we've reached our
+                                    // threshold of how many we're willing to send (or if we're out of data), go ahead
+                                    // and send the whole List.
+                                    messages.add(message);
+                                    messageBytes += data.length;
+                                    if ( messageBytes >= maxBufferSize || streamFinished ) {
+                                        // send the messages, then reset our state.
+                                        try {
+                                            producer.send(messages);
+                                        } catch (final Exception e) {
+                                            // we wrap the general exception in ProcessException because we want to separate
+                                            // failures in sending messages from general Exceptions that would indicate bugs
+                                            // in the Processor. Failure to send a message should be handled appropriately, but
+                                            // we don't want to catch the general Exception or RuntimeException in order to catch
+                                            // failures from Kafka's Producer.
+                                            throw new ProcessException("Failed to send messages to Kafka", e);
+                                        }
+                                        
+                                        messagesSent.addAndGet(messages.size());    // count number of messages sent
+                                        
+                                        // reset state
+                                        messages.clear();
+                                        messageBytes = 0;
+                                        
+                                        // We've successfully sent a batch of messages. Keep track of the byte offset in the
+                                        // FlowFile of the last successfully sent message. This way, if the messages cannot
+                                        // all be successfully sent, we know where to split off the data. This allows us to then
+                                        // split off the first X number of bytes and send to 'success' and then split off the rest
+                                        // and send them to 'failure'.
+                                        lastMessageOffset.set(in.getBytesConsumed());
+                                    }
+                                    
+                                    // reset BAOS so that we can start a new message.
+                                    baos.reset();
+                                    data = null;
+                                }
+                            }
+
+                            // If there are messages left, send them
+                            if ( !messages.isEmpty() ) {
+                                producer.send(messages);
+                            }
+                        }
+                    }
+                });
+                
+                final long nanos = System.nanoTime() - start;
+                
+                session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
+                session.transfer(flowFile, REL_SUCCESS);
+                getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+            } catch (final ProcessException pe) {
+                error = true;
+                
+                // There was a failure sending messages to Kafka. Iff the lastMessageOffset is 0, then all of them failed and we can
+                // just route the FlowFile to failure. Otherwise, some messages were successful, so split them off and send them to
+                // 'success' while we send the others to 'failure'.
+                final long offset = lastMessageOffset.get();
+                if ( offset == 0L ) {
+                    // all of the messages failed to send. Route FlowFile to failure
+                    getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] {flowFile, pe.getCause()});
+                    session.transfer(flowFile, REL_FAILURE);
+                } else {
+                    // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages.
+                    final FlowFile successfulMessages = session.clone(flowFile, 0L, offset);
+                    final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset);
+                    
+                    getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] {
+                         messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() });
+                    
+                    session.transfer(successfulMessages, REL_SUCCESS);
+                    session.transfer(failedMessages, REL_FAILURE);
+                    session.remove(flowFile);
+                    session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic);
+                }
+            } finally {
+                if ( error ) {
+                    producer.close();
+                } else {
+                    returnProducer(producer);
+                }
+            }
+            
         }
     }
-
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c91c7e78/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
index d429d6b..279dd75 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
@@ -54,21 +54,23 @@
             </thead>
             <tbody>
                 <tr>
-                    <td>kafka.key</td>
-                    <td>The key of the Kafka message, if it exists. If the message does not have a key,
-                    this attribute will not be added.</td>
-                </tr>
-                <tr>
                 	<td>kafka.topic</td>
                 	<td>The name of the Kafka Topic from which the message was received</td>
                 </tr>
                 <tr>
+                    <td>kafka.key</td>
+                    <td>The key of the Kafka message, if it exists and batch size is 1. If the message does not have a key,
+                    	or if the batch size is greater than 1, this attribute will not be added.</td>
+                </tr>
+                <tr>
                 	<td>kafka.partition</td>
-                	<td>The partition of the Kafka Topic from which the message was received</td>
+                	<td>The partition of the Kafka Topic from which the message was received. This attribute is added only
+                		if the batch size is 1.</td>
                 </tr>
                 <tr>
                 	<td>kafka.offset</td>
-                	<td>The offset of the message within the Kafka partition</td>
+                	<td>The offset of the message within the Kafka partition. This attribute is added only
+                		if the batch size is 1.</td>
                 </tr>
             </tbody>
         </table>
@@ -123,6 +125,30 @@
                     <li>Supports expression language: false</li>
                 </ul>
             </li>
+            
+            <li><strong>Batch Size</strong>
+                <ul>
+                    <li>Specifies the maximum number of messages to combine into a single FlowFile. 
+                    	These messages will be concatenated together with the &lt;Message Demarcator&gt; 
+                    	string placed between the content of each message. If the messages from Kafka 
+                    	should not be concatenated together, leave this value at 1.</li>
+                    <li>Default value: 1</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            
+            <li><strong>Message Demarcator</strong>
+                <ul>
+                    <li>Specifies the characters to use in order to demarcate multiple messages from Kafka. 
+                    	If the &lt;Batch Size&gt; property is set to 1, this value is ignored. Otherwise, for each two 
+                    	subsequent messages in the batch, this value will be placed in between them. This property will
+                    	treat "\n" as a new-line, "\r" as a carriage return and "\t" as a tab character. All other
+                    	characters are treated as literal characters.
+                    </li>
+                    <li>Default value: \n</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
             <li><strong>Client Name</strong>
                 <ul>
                     <li>Client Name to use when communicating with Kafka</li>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c91c7e78/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
index 38256c5..29b7c17 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
@@ -31,6 +31,16 @@
         	&lt;Kafka Key&gt; Property.
         </p>
 
+		<p>
+			The Processor allows the user to configure an optional Message Delimiter that
+			can be used to send many messages per FlowFile. For example, a \n could be used
+			to indicate that the contents of the FlowFile should be used to send one message
+			per line of text. If the property is not set, the entire contents of the FlowFile
+			will be sent as a single message. When using the delimiter, if some messages are
+			successfully sent but other messages fail to send, the FlowFile will be FORKed into
+			two child FlowFiles, with the successfully sent messages being routed to 'success'
+			and the messages that could not be sent going to 'failure'.
+		</p>
 
         <p>
             <strong>Properties:</strong>
@@ -45,7 +55,7 @@
                 <ul>
                     <li>
                     	A comma-separated list of known Kafka Brokers in the format 
-                    	&lgt;host&gt;:&lt;port&gt;. This list does not need to be
+                    	&lt;host&gt;:&lt;port&gt;. This list does not need to be
                     	exhaustive but provides a mechanism for determining which
                     	other nodes belong to the Kafka cluster.
                     </li>
@@ -106,6 +116,18 @@
                     <li>Supports expression language: false</li>
                 </ul>
             </li>
+            <li>Message Delimiter
+                <ul>
+                    <li>
+                    	Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. 
+                    	If not specified, the entire content of the FlowFile will be used as a single message.
+                    	If specified, the contents of the FlowFile will be split on this delimiter and each section 
+						sent as a separate Kafka message.
+                    </li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: true</li>
+                </ul>
+            </li>
             <li><strong>Communications Timeout</strong>
                 <ul>
                     <li>
@@ -116,16 +138,10 @@
                     <li>Supports expression language: false</li>
                 </ul>
             </li>
-            <li><strong>Max FlowFile Size</strong>
+            <li><strong>Max Buffer Size</strong>
                 <ul>
                     <li>
-                    	Specifies the amount of data that can be buffered to send to Kafka. Because
-                    	the contents of the FlowFile must be buffered into memory before they can
-                    	be sent to Kafka, attempting to send a very large FlowFile can cause
-                    	problems by causing the machine to run out of memory.
-                    	This helps to prevent the system from running out of memory, the PutKafka
-                    	Processor exposes a property for specifying the maximum size of a FlowFile.
-                    	If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'.
+                    	The maximum amount of data to buffer in memory before sending to Kafka
                     </li>
                     <li>Default value: 1 MB</li>
                     <li>Supports expression language: false</li>
@@ -148,25 +164,21 @@
             <li>success
                 <ul>
                     <li>All FlowFiles that are successfully sent to Kafka are routed 
-                    	to this relationship.
-                    </li>
-                </ul>
-            </li>
-            
-            <li>reject
-                <ul>
-                    <li>Any FlowFile whose content size exceeds the configured value for 
-                    	the &lt;Max FlowFile Size&gt; property will be routed to this 
-                    	relationship.
+                    	to this relationship. If using the &lt;Message Delimiter&gt; property,
+                    	it's possible for some messages to be sent while others fail. In this
+                    	case, only the messages that are successfully sent will be routed to
+                    	this Relationship while the other messages will be routed to the
+                    	'failure' relationship.
                     </li>
                 </ul>
             </li>
             
             <li>failure
                 <ul>
-                    <li>All FlowFiles that cannot be sent to Kafka for any reason other 
-                    	than their content size exceeding the value of the &lt;Max FlowFile 
-                    	Size&gt; property will be routed to this relationship.
+                    <li>All FlowFiles that cannot be sent to Kafka for any reason be routed 
+                    	to this relationship. If a portion of a FlowFile is successfully sent
+                    	to Kafka but not all, only those messages that cannot be sent to Kafka
+                    	will be routed to this Relationship.
                     </li>
                 </ul>
             </li>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c91c7e78/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
index 2199a9c..10560f8 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
@@ -16,20 +16,27 @@
  */
 package org.apache.nifi.processors.kafka;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
+import kafka.consumer.ConsumerIterator;
+import kafka.message.MessageAndMetadata;
+
 import org.apache.log4j.BasicConfigurator;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
-@Ignore("Intended only for local tests to verify functionality.")
 public class TestGetKafka {
 
-	public static final String ZOOKEEPER_CONNECTION = "192.168.0.101:2181";
 	
     @BeforeClass
     public static void configureLogging() {
@@ -39,9 +46,10 @@ public class TestGetKafka {
     }
     
     @Test
+    @Ignore("Intended only for local tests to verify functionality.")
     public void testIntegrationLocally() {
         final TestRunner runner = TestRunners.newTestRunner(GetKafka.class);
-        runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, ZOOKEEPER_CONNECTION);
+        runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "192.168.0.101:2181");
         runner.setProperty(GetKafka.TOPIC, "testX");
         runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
         runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
@@ -56,4 +64,99 @@ public class TestGetKafka {
         }
     }
     
+    
+    @Test
+    public void testWithDelimiter() {
+        final List<String> messages = new ArrayList<>();
+        messages.add("Hello");
+        messages.add("Good-bye");
+        
+        final TestableProcessor proc = new TestableProcessor(null, messages);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
+        runner.setProperty(GetKafka.TOPIC, "testX");
+        runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
+        runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
+        runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n");
+        runner.setProperty(GetKafka.BATCH_SIZE, "2");
+        
+        runner.run();
+        
+        runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1);
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0);
+        mff.assertContentEquals("Hello\nGood-bye");
+    }
+    
+    @Test
+    public void testWithDelimiterAndNotEnoughMessages() {
+        final List<String> messages = new ArrayList<>();
+        messages.add("Hello");
+        messages.add("Good-bye");
+        
+        final TestableProcessor proc = new TestableProcessor(null, messages);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
+        runner.setProperty(GetKafka.TOPIC, "testX");
+        runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
+        runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
+        runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n");
+        runner.setProperty(GetKafka.BATCH_SIZE, "3");
+        
+        runner.run();
+        
+        runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1);
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0);
+        mff.assertContentEquals("Hello\nGood-bye");
+    }
+    
+    
+    private static class TestableProcessor extends GetKafka {
+        private final byte[] key;
+        private final Iterator<String> messageItr;
+        
+        public TestableProcessor(final byte[] key, final List<String> messages) {
+            this.key = key;
+            messageItr = messages.iterator();
+        }
+        
+        @Override
+        public void createConsumers(ProcessContext context) {
+        }
+        
+        @Override
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
+            final ConsumerIterator<byte[], byte[]> itr = Mockito.mock(ConsumerIterator.class);
+            
+            Mockito.doAnswer(new Answer<Boolean>() {
+                @Override
+                public Boolean answer(final InvocationOnMock invocation) throws Throwable {
+                    return messageItr.hasNext();
+                }
+            }).when(itr).hasNext();
+            
+            Mockito.doAnswer(new Answer<MessageAndMetadata>() {
+                @Override
+                public MessageAndMetadata answer(InvocationOnMock invocation) throws Throwable {
+                    final MessageAndMetadata mam = Mockito.mock(MessageAndMetadata.class);
+                    Mockito.when(mam.key()).thenReturn(key);
+                    Mockito.when(mam.offset()).thenReturn(0L);
+                    Mockito.when(mam.partition()).thenReturn(0);
+                    
+                    Mockito.doAnswer(new Answer<byte[]>() {
+                        @Override
+                        public byte[] answer(InvocationOnMock invocation) throws Throwable {
+                            return messageItr.next().getBytes();
+                        }
+                        
+                    }).when(mam).message();
+                    
+                    return mam;
+                }
+            }).when(itr).next();
+            
+            return itr;
+        }
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c91c7e78/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 2e6aacf..cf7ed68 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -1,12 +1,22 @@
 package org.apache.nifi.processors.kafka;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import kafka.common.FailedToSendMessageException;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.annotation.OnScheduled;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -14,10 +24,109 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 
-@Ignore("Intended only for local testing to verify functionality.")
 public class TestPutKafka {
 
+    @Test
+    public void testMultipleKeyValuePerFlowFile() {
+        final TestableProcessor proc = new TestableProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        
+        runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes());
+        runner.run();
+        
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+        
+        final List<byte[]> messages = proc.getProducer().getMessages();
+        assertEquals(11, messages.size());
+        
+        assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0)));
+        assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1)));
+        assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), messages.get(2)));
+        assertTrue(Arrays.equals("2".getBytes(StandardCharsets.UTF_8), messages.get(3)));
+        assertTrue(Arrays.equals("3".getBytes(StandardCharsets.UTF_8), messages.get(4)));
+        assertTrue(Arrays.equals("4".getBytes(StandardCharsets.UTF_8), messages.get(5)));
+        assertTrue(Arrays.equals("5".getBytes(StandardCharsets.UTF_8), messages.get(6)));
+        assertTrue(Arrays.equals("6".getBytes(StandardCharsets.UTF_8), messages.get(7)));
+        assertTrue(Arrays.equals("7".getBytes(StandardCharsets.UTF_8), messages.get(8)));
+        assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), messages.get(9)));
+        assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), messages.get(10)));
+    }
+    
+    
+    @Test
+    public void testWithImmediateFailure() {
+        final TestableProcessor proc = new TestableProcessor(0);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        
+        final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9";
+        runner.enqueue(text.getBytes());
+        runner.run();
+        
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
+        mff.assertContentEquals(text);
+    }
+    
+    
+    @Test
+    public void testPartialFailure() {
+        final TestableProcessor proc = new TestableProcessor(2);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
+        
+        final byte[] bytes = "1\n2\n3\n4".getBytes();
+        runner.enqueue(bytes);
+        runner.run();
+        
+        runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
+
+        final MockFlowFile successFF = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
+        successFF.assertContentEquals("1\n2\n");
+        
+        final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
+        failureFF.assertContentEquals("3\n4");
+    }
+    
+    
+    @Test
+    public void testWithEmptyMessages() {
+        final TestableProcessor proc = new TestableProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        
+        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
+        runner.enqueue(bytes);
+        runner.run();
+        
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+
+        final List<byte[]> msgs = proc.getProducer().getMessages();
+        assertEquals(4, msgs.size());
+        assertTrue(Arrays.equals("1".getBytes(), msgs.get(0)));
+        assertTrue(Arrays.equals("2".getBytes(), msgs.get(1)));
+        assertTrue(Arrays.equals("3".getBytes(), msgs.get(2)));
+        assertTrue(Arrays.equals("4".getBytes(), msgs.get(3)));
+    }
+    
+    
 	@Test
+	@Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...")
 	public void testKeyValuePut() {
 		final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
 		runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
@@ -45,4 +154,67 @@ public class TestPutKafka {
 		assertTrue(Arrays.equals(data, mff.toByteArray()));
 	}
 	
+	
+	private static class TestableProcessor extends PutKafka {
+	    private MockProducer producer;
+	    private int failAfter = Integer.MAX_VALUE;
+	    
+	    public TestableProcessor() {
+	    }
+	    
+	    public TestableProcessor(final int failAfter) {
+	        this.failAfter = failAfter;
+	    }
+	    
+	    @OnScheduled
+	    public void instantiateProducer(final ProcessContext context) {
+	        producer = new MockProducer(createConfig(context));
+	        producer.setFailAfter(failAfter);
+	    }
+	    
+	    @Override
+	    protected Producer<byte[], byte[]> createProducer(final ProcessContext context) {
+	        return producer;
+	    }
+	    
+	    public MockProducer getProducer() {
+	        return producer;
+	    }
+	}
+	
+	
+	private static class MockProducer extends Producer<byte[], byte[]> {
+	    private int sendCount = 0;
+	    private int failAfter = Integer.MAX_VALUE;
+	    
+	    private final List<byte[]> messages = new ArrayList<>();
+	    
+        public MockProducer(final ProducerConfig config) {
+            super(config);
+        }
+	    
+        @Override
+        public void send(final KeyedMessage<byte[], byte[]> message) {
+            if ( ++sendCount > failAfter ) {
+                throw new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages"));
+            } else {
+                messages.add(message.message());
+            }
+        }
+        
+        public List<byte[]> getMessages() {
+            return messages;
+        }
+        
+        @Override
+        public void send(final List<KeyedMessage<byte[], byte[]>> messages) {
+            for ( final KeyedMessage<byte[], byte[]> msg : messages ) {
+                send(msg);
+            }
+        }
+        
+        public void setFailAfter(final int successCount) {
+            failAfter = successCount;
+        }
+	}
 }


[3/7] incubator-nifi git commit: NIFI-226: when pre-fetching FlowFiles from queue, if a FlowFile is penalized, it is lost

Posted by ma...@apache.org.
NIFI-226: when pre-fetching FlowFiles from queue, if a FlowFile is penalized, it is lost


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/a2ecfe33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/a2ecfe33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/a2ecfe33

Branch: refs/heads/develop
Commit: a2ecfe3355f1ac75285e42d3f34bb9761723cc5b
Parents: 68b7ad7
Author: Mark Payne <ma...@hotmail.com>
Authored: Mon Jan 5 15:36:40 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Mon Jan 5 15:36:40 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/controller/StandardFlowFileQueue.java    | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/a2ecfe33/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 3b880bb..59d2308 100644
--- a/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -1011,6 +1011,9 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
             if (record == null || record.isPenalized()) {
                 // not enough unpenalized records to pull. Put all records back and return
                 activeQueue.addAll(buffer);
+                if ( record != null ) {
+                    activeQueue.add(record);
+                }
                 return;
             } else {
                 buffer.add(record);