You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/05 03:10:41 UTC

incubator-nifi git commit: NIFI-220: Allow for demarcator to be specified for Kafka Get and Put and added unit tests; updated docs

Repository: incubator-nifi
Updated Branches:
  refs/heads/NIFI-220 3e2f79067 -> c91c7e789


NIFI-220: Allow for demarcator to be specified for Kafka Get and Put and added unit tests; updated docs


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c91c7e78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c91c7e78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c91c7e78

Branch: refs/heads/NIFI-220
Commit: c91c7e78970a5261b05572dfbad02ee91287bf5e
Parents: 3e2f790
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Jan 4 21:10:34 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Jan 4 21:10:34 2015 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/GetKafka.java  | 127 +++++---
 .../apache/nifi/processors/kafka/PutKafka.java  | 287 +++++++++++++++----
 .../index.html                                  |  40 ++-
 .../index.html                                  |  56 ++--
 .../nifi/processors/kafka/TestGetKafka.java     | 109 ++++++-
 .../nifi/processors/kafka/TestPutKafka.java     | 174 ++++++++++-
 6 files changed, 662 insertions(+), 131 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c91c7e78/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 55c67e3..ea4296e 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -24,6 +24,7 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.Validator;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
@@ -81,6 +82,26 @@ public class GetKafka extends AbstractProcessor {
 	    .expressionLanguageSupported(false)
 	    .defaultValue("30 secs")
 	    .build();
+    public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
+        .name("Batch Size")
+        .description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be "
+                + "concatenated together with the <Message Demarcator> string placed between the content of each message. "
+                + "If the messages from Kafka should not be concatenated together, leave this value at 1.")
+        .required(true)
+        .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .defaultValue("1")
+        .build();
+    public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
+        .name("Message Demarcator")
+        .description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the <Batch Size> "
+                + "property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, "
+                + "this value will be placed in between them.")
+        .required(true)
+        .addValidator(Validator.VALID)  // accept anything as a demarcator, including empty string
+        .expressionLanguageSupported(false)
+        .defaultValue("\\n")
+        .build();
     public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
         .name("Client Name")
         .description("Client Name to use when communicating with Kafka")
@@ -113,6 +134,8 @@ public class GetKafka extends AbstractProcessor {
         props.add(ZOOKEEPER_CONNECTION_STRING);
         props.add(TOPIC);
         props.add(ZOOKEEPER_COMMIT_DELAY);
+        props.add(BATCH_SIZE);
+        props.add(MESSAGE_DEMARCATOR);
         props.add(clientNameWithDefault);
         props.add(KAFKA_TIMEOUT);
         props.add(ZOOKEEPER_TIMEOUT);
@@ -181,15 +204,25 @@ public class GetKafka extends AbstractProcessor {
     	}
     }
     
+    protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
+        return streamIterators.poll();
+    }
+    
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-    	ConsumerIterator<byte[], byte[]> iterator = streamIterators.poll();
+    	ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
     	if ( iterator == null ) {
     		return;
     	}
     	
+    	final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
+    	final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+    	final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8);
+    	final String topic = context.getProperty(TOPIC).getValue();
+    	
     	FlowFile flowFile = null;
     	try {
+    	    // add the current thread to the Set of those to be interrupted if processor stopped.
     		interruptionLock.lock();
     		try {
     			interruptableThreads.add(Thread.currentThread());
@@ -197,52 +230,73 @@ public class GetKafka extends AbstractProcessor {
     			interruptionLock.unlock();
     		}
     		
-    		try {
-	    		if (!iterator.hasNext() ) {
-	    			return;
-	    		}
-    		} catch (final Exception e) {
-    			getLogger().warn("Failed to invoke hasNext() due to ", new Object[] {e});
-    			iterator = null;
-    			return;
-    		}
-    		
     		final long start = System.nanoTime();
-    		final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
-    		
-    		if ( mam == null ) {
-    			return;
-    		}
-    		
-    		final byte[] key = mam.key();
+    		flowFile = session.create();
     		
     		final Map<String, String> attributes = new HashMap<>();
-    		if ( key != null ) {
-    			attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
+            attributes.put("kafka.topic", topic);
+
+            int numMessages = 0;
+    		for (int msgCount = 0; msgCount < batchSize; msgCount++) {
+    		    // if the processor is stopped, iterator.hasNext() will throw an Exception.
+    		    // In this case, we just break out of the loop.
+    		    try {
+        		    if ( !iterator.hasNext() ) {
+        		        break;
+        		    }
+    		    } catch (final Exception e) {
+    		        break;
+    		    }
+    		    
+        		final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
+        		if ( mam == null ) {
+        			return;
+        		}
+        		
+        		final byte[] key = mam.key();
+        		
+        		if ( batchSize == 1 ) {
+        		    // the kafka.key, kafka.offset, and kafka.partition attributes are added only
+        		    // for a batch size of 1.
+        		    if ( key != null ) {
+        		        attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8));
+        		    }
+        		    
+            		attributes.put("kafka.offset", String.valueOf(mam.offset()));
+            		attributes.put("kafka.partition", String.valueOf(mam.partition()));
+        		}
+        		
+        		// add the message to the FlowFile's contents
+        		final boolean firstMessage = (msgCount == 0);
+        		flowFile = session.append(flowFile, new OutputStreamCallback() {
+    				@Override
+    				public void process(final OutputStream out) throws IOException {
+    				    if ( !firstMessage ) {
+    				        out.write(demarcatorBytes);
+    				    }
+    					out.write(mam.message());
+    				}
+        		});
+        		numMessages++;
     		}
-    		attributes.put("kafka.offset", String.valueOf(mam.offset()));
-    		attributes.put("kafka.partition", String.valueOf(mam.partition()));
-    		attributes.put("kafka.topic", mam.topic());
     		
-    		flowFile = session.create();
-    		flowFile = session.write(flowFile, new OutputStreamCallback() {
-				@Override
-				public void process(final OutputStream out) throws IOException {
-					out.write(mam.message());
-				}
-    		});
-    		
-    		flowFile = session.putAllAttributes(flowFile, attributes);
-    		final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-    		session.getProvenanceReporter().receive(flowFile, "kafka://" + mam.topic() + "/partitions/" + mam.partition() + "/offsets/" + mam.offset(), millis);
-    		getLogger().info("Successfully received {} from Kafka in {} millis", new Object[] {flowFile, millis});
-    		session.transfer(flowFile, REL_SUCCESS);
+    		// If we received no messages, remove the FlowFile. Otherwise, send to success.
+    		if ( flowFile.getSize() == 0L ) {
+    		    session.remove(flowFile);
+    		} else {
+        		flowFile = session.putAllAttributes(flowFile, attributes);
+        		final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        		session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, millis);
+        		getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[] {flowFile, numMessages, millis});
+        		session.transfer(flowFile, REL_SUCCESS);
+    		}
     	} catch (final Exception e) {
     		getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[] {e});
     		if ( flowFile != null ) {
     			session.remove(flowFile);
     		}
     	} finally {
+    	    // Remove the current thread from the Set of Threads to interrupt.
     		interruptionLock.lock();
     		try {
     			interruptableThreads.remove(Thread.currentThread());
@@ -250,6 +304,7 @@ public class GetKafka extends AbstractProcessor {
     			interruptionLock.unlock();
     		}
     		
+    		// Add the iterator back to the queue
     		if ( iterator != null ) {
     			streamIterators.offer(iterator);
     		}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c91c7e78/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 5e5940c..4b5a742 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -48,7 +48,14 @@ import org.apache.nifi.processor.annotation.Tags;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.BufferedInputStream;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
+import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer;
+import org.apache.nifi.util.LongHolder;
+
+import scala.actors.threadpool.Arrays;
 
 @SupportsBatching
 @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"})
@@ -90,6 +97,24 @@ public class PutKafka extends AbstractProcessor {
 		.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
 		.defaultValue(DELIVERY_BEST_EFFORT.getValue())
 		.build();
+    public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder()
+            .name("Message Delimiter")
+            .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. "
+                    + "If not specified, the entire content of the FlowFile will be used as a single message. "
+                    + "If specified, the contents of the FlowFile will be split on this delimiter and each section "
+                    + "sent as a separate Kafka message.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+    public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
+        .name("Max Buffer Size")
+        .description("The maximum amount of data to buffer in memory before sending to Kafka")
+        .required(true)
+        .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+        .expressionLanguageSupported(false)
+        .defaultValue("1 MB")
+        .build();
     public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
 	    .name("Communications Timeout")
 	    .description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
@@ -98,14 +123,6 @@ public class PutKafka extends AbstractProcessor {
 	    .expressionLanguageSupported(false)
 	    .defaultValue("30 secs")
 	    .build();
-    public static final PropertyDescriptor MAX_FLOWFILE_SIZE = new PropertyDescriptor.Builder()
-		.name("Max FlowFile Size")
-		.description("Specifies the amount of data that can be buffered to send to Kafka. If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'. This helps to prevent the system from running out of memory")
-		.required(true)
-		.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-		.expressionLanguageSupported(false)
-		.defaultValue("1 MB")
-		.build();
     public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
 	    .name("Client Name")
 	    .description("Client Name to use when communicating with Kafka")
@@ -123,10 +140,6 @@ public class PutKafka extends AbstractProcessor {
 	    .name("failure")
 	    .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship")
 	    .build();
-    public static final Relationship REL_REJECT = new Relationship.Builder()
-	    .name("reject")
-	    .description("Any FlowFile whose size exceeds the <Max FlowFile Size> property will be routed to this Relationship")
-	    .build();
 
     private final BlockingQueue<Producer<byte[], byte[]>> producers = new LinkedBlockingQueue<>();
     
@@ -142,8 +155,9 @@ public class PutKafka extends AbstractProcessor {
         props.add(TOPIC);
         props.add(KEY);
         props.add(DELIVERY_GUARANTEE);
+        props.add(MESSAGE_DELIMITER);
+        props.add(MAX_BUFFER_SIZE);
         props.add(TIMEOUT);
-        props.add(MAX_FLOWFILE_SIZE);
         props.add(clientName);
         return props;
     }
@@ -153,7 +167,6 @@ public class PutKafka extends AbstractProcessor {
         final Set<Relationship> relationships = new HashSet<>(1);
         relationships.add(REL_SUCCESS);
         relationships.add(REL_FAILURE);
-        relationships.add(REL_REJECT);
         return relationships;
     }
     
@@ -167,11 +180,10 @@ public class PutKafka extends AbstractProcessor {
     	}
     }
     
-    
-    private Producer<byte[], byte[]> createProducer(final ProcessContext context) {
-    	final String brokers = context.getProperty(SEED_BROKERS).getValue();
+    protected ProducerConfig createConfig(final ProcessContext context) {
+        final String brokers = context.getProperty(SEED_BROKERS).getValue();
 
-    	final Properties properties = new Properties();
+        final Properties properties = new Properties();
         properties.setProperty("metadata.broker.list", brokers);
         properties.setProperty("request.required.acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
         properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
@@ -180,8 +192,11 @@ public class PutKafka extends AbstractProcessor {
         properties.setProperty("message.send.max.retries", "1");
         properties.setProperty("producer.type", "sync");
         
-        final ProducerConfig config = new ProducerConfig(properties);
-        return new Producer<>(config);
+        return new ProducerConfig(properties);
+    }
+    
+    protected Producer<byte[], byte[]> createProducer(final ProcessContext context) {
+    	return new Producer<>(createConfig(context));
     }
     
     private Producer<byte[], byte[]> borrowProducer(final ProcessContext context) {
@@ -201,52 +216,200 @@ public class PutKafka extends AbstractProcessor {
     	}
     	
     	final long start = System.nanoTime();
-    	final long maxSize = context.getProperty(MAX_FLOWFILE_SIZE).asDataSize(DataUnit.B).longValue();
-    	if ( flowFile.getSize() > maxSize ) {
-    		getLogger().info("Routing {} to 'reject' because its size exceeds the configured maximum allowed size", new Object[] {flowFile});
-    		session.getProvenanceReporter().route(flowFile, REL_REJECT, "FlowFile is larger than " + maxSize);
-    		session.transfer(flowFile, REL_REJECT);
-    		return;
-    	}
-    	
         final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue();
         final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue();
+        final byte[] keyBytes = (key == null) ? null : key.getBytes(StandardCharsets.UTF_8);
+        String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue();
+        if ( delimiter != null ) {
+            delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
+        }
         
-        final byte[] value = new byte[(int) flowFile.getSize()];
-        session.read(flowFile, new InputStreamCallback() {
-			@Override
-			public void process(final InputStream in) throws IOException {
-				StreamUtils.fillBuffer(in, value);
-			}
-        });
-        
+        final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue();
         final Producer<byte[], byte[]> producer = borrowProducer(context);
-        boolean error = false;
-        try {
-        	final KeyedMessage<byte[], byte[]> message;
-        	if ( key == null ) {
-        		message = new KeyedMessage<>(topic, value);
-        	} else {
-        		message = new KeyedMessage<>(topic, key.getBytes(StandardCharsets.UTF_8), value);
-        	}
-        	
-        	producer.send(message);
-        	final long nanos = System.nanoTime() - start;
-        	
-        	session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
-        	session.transfer(flowFile, REL_SUCCESS);
-        	getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
-        } catch (final Exception e) {
-        	getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e});
-        	session.transfer(flowFile, REL_FAILURE);
-        	error = true;
-        } finally {
-        	if ( error ) {
-        		producer.close();
-        	} else {
-        		returnProducer(producer);
-        	}
+        
+        if ( delimiter == null ) {
+            // Send the entire FlowFile as a single message.
+            final byte[] value = new byte[(int) flowFile.getSize()];
+            session.read(flowFile, new InputStreamCallback() {
+    			@Override
+    			public void process(final InputStream in) throws IOException {
+    				StreamUtils.fillBuffer(in, value);
+    			}
+            });
+            
+            boolean error = false;
+            try {
+                final KeyedMessage<byte[], byte[]> message;
+                if ( key == null ) {
+                    message = new KeyedMessage<>(topic, value);
+                } else {
+                    message = new KeyedMessage<>(topic, keyBytes, value);
+                }
+                
+                producer.send(message);
+                final long nanos = System.nanoTime() - start;
+                
+                session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
+                session.transfer(flowFile, REL_SUCCESS);
+                getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+            } catch (final Exception e) {
+                getLogger().error("Failed to send {} to Kafka due to {}; routing to failure", new Object[] {flowFile, e});
+                session.transfer(flowFile, REL_FAILURE);
+                error = true;
+            } finally {
+                if ( error ) {
+                    producer.close();
+                } else {
+                    returnProducer(producer);
+                }
+            }
+        } else {
+            final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8);
+            
+            // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see
+            // if it matches some pattern. We can use this to search for the delimiter as we read through
+            // the stream of bytes in the FlowFile
+            final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes);
+            
+            boolean error = false;
+            final LongHolder lastMessageOffset = new LongHolder(0L);
+            final LongHolder messagesSent = new LongHolder(0L);
+            
+            try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+                session.read(flowFile, new InputStreamCallback() {
+                    @Override
+                    public void process(final InputStream rawIn) throws IOException {
+                        byte[] data = null; // contents of a single message
+                        
+                        boolean streamFinished = false;
+                        
+                        final List<KeyedMessage<byte[], byte[]>> messages = new ArrayList<>(); // batch to send
+                        long messageBytes = 0L; // size of messages in the 'messages' list
+                        
+                        int nextByte;
+                        try (final InputStream bufferedIn = new BufferedInputStream(rawIn);
+                             final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) {
+                            
+                            // read until we're out of data.
+                            while (!streamFinished) {
+                                nextByte = in.read();
+
+                                if ( nextByte > -1 ) {
+                                    baos.write(nextByte);
+                                }
+                                
+                                if (nextByte == -1) {
+                                    // we ran out of data. This message is complete.
+                                    data = baos.toByteArray();
+                                    streamFinished = true;
+                                } else if ( buffer.addAndCompare((byte) nextByte) ) {
+                                    // we matched our delimiter. This message is complete. We want all of the bytes from the
+                                    // underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want
+                                    // the delimiter itself to be sent.
+                                    data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length);
+                                }
+                                
+                                createMessage: if ( data != null ) {
+                                    // If the message has no data, ignore it.
+                                    if ( data.length == 0 ) {
+                                        data = null;
+                                        baos.reset();
+                                        break createMessage;
+                                    }
+                                    
+                                    // either we ran out of data or we reached the end of the message. 
+                                    // Either way, create the message because it's ready to send.
+                                    final KeyedMessage<byte[], byte[]> message;
+                                    if ( key == null ) {
+                                        message = new KeyedMessage<>(topic, data);
+                                    } else {
+                                        message = new KeyedMessage<>(topic, keyBytes, data);
+                                    }
+                                    
+                                    // Add the message to the list of messages ready to send. If we've reached our
+                                    // threshold of how many we're willing to send (or if we're out of data), go ahead
+                                    // and send the whole List.
+                                    messages.add(message);
+                                    messageBytes += data.length;
+                                    if ( messageBytes >= maxBufferSize || streamFinished ) {
+                                        // send the messages, then reset our state.
+                                        try {
+                                            producer.send(messages);
+                                        } catch (final Exception e) {
+                                            // we wrap the general exception in ProcessException because we want to separate
+                                            // failures in sending messages from general Exceptions that would indicate bugs
+                                            // in the Processor. Failure to send a message should be handled appropriately, but
+                                            // we don't want to catch the general Exception or RuntimeException in order to catch
+                                            // failures from Kafka's Producer.
+                                            throw new ProcessException("Failed to send messages to Kafka", e);
+                                        }
+                                        
+                                        messagesSent.addAndGet(messages.size());    // count number of messages sent
+                                        
+                                        // reset state
+                                        messages.clear();
+                                        messageBytes = 0;
+                                        
+                                        // We've successfully sent a batch of messages. Keep track of the byte offset in the
+                                        // FlowFile of the last successfully sent message. This way, if the messages cannot
+                                        // all be successfully sent, we know where to split off the data. This allows us to then
+                                        // split off the first X number of bytes and send to 'success' and then split off the rest
+                                        // and send them to 'failure'.
+                                        lastMessageOffset.set(in.getBytesConsumed());
+                                    }
+                                    
+                                    // reset BAOS so that we can start a new message.
+                                    baos.reset();
+                                    data = null;
+                                }
+                            }
+
+                            // If there are messages left, send them
+                            if ( !messages.isEmpty() ) {
+                                producer.send(messages);
+                            }
+                        }
+                    }
+                });
+                
+                final long nanos = System.nanoTime() - start;
+                
+                session.getProvenanceReporter().send(flowFile, "kafka://" + topic);
+                session.transfer(flowFile, REL_SUCCESS);
+                getLogger().info("Successfully sent {} to Kafka in {} millis", new Object[] {flowFile, TimeUnit.NANOSECONDS.toMillis(nanos)});
+            } catch (final ProcessException pe) {
+                error = true;
+                
+                // There was a failure sending messages to Kafka. Iff the lastMessageOffset is 0, then all of them failed and we can
+                // just route the FlowFile to failure. Otherwise, some messages were successful, so split them off and send them to
+                // 'success' while we send the others to 'failure'.
+                final long offset = lastMessageOffset.get();
+                if ( offset == 0L ) {
+                    // all of the messages failed to send. Route FlowFile to failure
+                    getLogger().error("Failed to send {} to Kafka due to {}; routing to fialure", new Object[] {flowFile, pe.getCause()});
+                    session.transfer(flowFile, REL_FAILURE);
+                } else {
+                    // Some of the messages were sent successfully. We want to split off the successful messages from the failed messages.
+                    final FlowFile successfulMessages = session.clone(flowFile, 0L, offset);
+                    final FlowFile failedMessages = session.clone(flowFile, offset, flowFile.getSize() - offset);
+                    
+                    getLogger().error("Successfully sent {} of the messages from {} but then failed to send the rest. Original FlowFile split into two: {} routed to 'success', {} routed to 'failure'. Failure was due to {}", new Object[] {
+                         messagesSent.get(), flowFile, successfulMessages, failedMessages, pe.getCause() });
+                    
+                    session.transfer(successfulMessages, REL_SUCCESS);
+                    session.transfer(failedMessages, REL_FAILURE);
+                    session.remove(flowFile);
+                    session.getProvenanceReporter().send(successfulMessages, "kafka://" + topic);
+                }
+            } finally {
+                if ( error ) {
+                    producer.close();
+                } else {
+                    returnProducer(producer);
+                }
+            }
+            
         }
     }
-
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c91c7e78/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
index d429d6b..279dd75 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.GetKafka/index.html
@@ -54,21 +54,23 @@
             </thead>
             <tbody>
                 <tr>
-                    <td>kafka.key</td>
-                    <td>The key of the Kafka message, if it exists. If the message does not have a key,
-                    this attribute will not be added.</td>
-                </tr>
-                <tr>
                 	<td>kafka.topic</td>
                 	<td>The name of the Kafka Topic from which the message was received</td>
                 </tr>
                 <tr>
+                    <td>kafka.key</td>
+                    <td>The key of the Kafka message, if it exists and batch size is 1. If the message does not have a key,
+                    	or if the batch size is greater than 1, this attribute will not be added.</td>
+                </tr>
+                <tr>
                 	<td>kafka.partition</td>
-                	<td>The partition of the Kafka Topic from which the message was received</td>
+                	<td>The partition of the Kafka Topic from which the message was received. This attribute is added only
+                		if the batch size is 1.</td>
                 </tr>
                 <tr>
                 	<td>kafka.offset</td>
-                	<td>The offset of the message within the Kafka partition</td>
+                	<td>The offset of the message within the Kafka partition. This attribute is added only
+                		if the batch size is 1.</td>
                 </tr>
             </tbody>
         </table>
@@ -123,6 +125,30 @@
                     <li>Supports expression language: false</li>
                 </ul>
             </li>
+            
+            <li><strong>Batch Size</strong>
+                <ul>
+                    <li>Specifies the maximum number of messages to combine into a single FlowFile. 
+                    	These messages will be concatenated together with the &lt;Message Demarcator&gt; 
+                    	string placed between the content of each message. If the messages from Kafka 
+                    	should not be concatenated together, leave this value at 1.</li>
+                    <li>Default value: 1</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
+            
+            <li><strong>Message Demarcator</strong>
+                <ul>
+                    <li>Specifies the characters to use in order to demarcate multiple messages from Kafka. 
+                    	If the &lt;Batch Size&gt; property is set to 1, this value is ignored. Otherwise, for each two 
+                    	subsequent messages in the batch, this value will be placed in between them. This property will
+                    	treat "\n" as a new-line, "\r" as a carriage return and "\t" as a tab character. All other
+                    	characters are treated as literal characters.
+                    </li>
+                    <li>Default value: \n</li>
+                    <li>Supports expression language: false</li>
+                </ul>
+            </li>
             <li><strong>Client Name</strong>
                 <ul>
                     <li>Client Name to use when communicating with Kafka</li>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c91c7e78/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
index 38256c5..29b7c17 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/main/resources/docs/org.apache.nifi.processors.kafka.PutKafka/index.html
@@ -31,6 +31,16 @@
         	&lt;Kafka Key&gt; Property.
         </p>
 
+		<p>
+			The Processor allows the user to configure an optional Message Delimiter that
+			can be used to send many messages per FlowFile. For example, a \n could be used
+			to indicate that the contents of the FlowFile should be used to send one message
+			per line of text. If the property is not set, the entire contents of the FlowFile
+			will be sent as a single message. When using the delimiter, if some messages are
+			successfully sent but other messages fail to send, the FlowFile will be FORKed into
+			two child FlowFiles, with the successfully sent messages being routed to 'success'
+			and the messages that could not be sent going to 'failure'.
+		</p>
 
         <p>
             <strong>Properties:</strong>
@@ -45,7 +55,7 @@
                 <ul>
                     <li>
                     	A comma-separated list of known Kafka Brokers in the format 
-                    	&lgt;host&gt;:&lt;port&gt;. This list does not need to be
+                    	&lt;host&gt;:&lt;port&gt;. This list does not need to be
                     	exhaustive but provides a mechanism for determining which
                     	other nodes belong to the Kafka cluster.
                     </li>
@@ -106,6 +116,18 @@
                     <li>Supports expression language: false</li>
                 </ul>
             </li>
+            <li>Message Delimiter
+                <ul>
+                    <li>
+                    	Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. 
+                    	If not specified, the entire content of the FlowFile will be used as a single message.
+                    	If specified, the contents of the FlowFile will be split on this delimiter and each section 
+						sent as a separate Kafka message.
+                    </li>
+                    <li>Default value: no default</li>
+                    <li>Supports expression language: true</li>
+                </ul>
+            </li>
             <li><strong>Communications Timeout</strong>
                 <ul>
                     <li>
@@ -116,16 +138,10 @@
                     <li>Supports expression language: false</li>
                 </ul>
             </li>
-            <li><strong>Max FlowFile Size</strong>
+            <li><strong>Max Buffer Size</strong>
                 <ul>
                     <li>
-                    	Specifies the amount of data that can be buffered to send to Kafka. Because
-                    	the contents of the FlowFile must be buffered into memory before they can
-                    	be sent to Kafka, attempting to send a very large FlowFile can cause
-                    	problems by causing the machine to run out of memory.
-                    	This helps to prevent the system from running out of memory, the PutKafka
-                    	Processor exposes a property for specifying the maximum size of a FlowFile.
-                    	If the size of a FlowFile is larger than this, that FlowFile will be routed to 'reject'.
+                    	The maximum amount of data to buffer in memory before sending to Kafka
                     </li>
                     <li>Default value: 1 MB</li>
                     <li>Supports expression language: false</li>
@@ -148,25 +164,21 @@
             <li>success
                 <ul>
                     <li>All FlowFiles that are successfully sent to Kafka are routed 
-                    	to this relationship.
-                    </li>
-                </ul>
-            </li>
-            
-            <li>reject
-                <ul>
-                    <li>Any FlowFile whose content size exceeds the configured value for 
-                    	the &lt;Max FlowFile Size&gt; property will be routed to this 
-                    	relationship.
+                    	to this relationship. If using the &lt;Message Delimiter&gt; property,
+                    	it's possible for some messages to be sent while others fail. In this
+                    	case, only the messages that are successfully sent will be routed to
+                    	this Relationship while the other messages will be routed to the
+                    	'failure' relationship.
                     </li>
                 </ul>
             </li>
             
             <li>failure
                 <ul>
-                    <li>All FlowFiles that cannot be sent to Kafka for any reason other 
-                    	than their content size exceeding the value of the &lt;Max FlowFile 
-                    	Size&gt; property will be routed to this relationship.
+                    <li>All FlowFiles that cannot be sent to Kafka for any reason be routed 
+                    	to this relationship. If a portion of a FlowFile is successfully sent
+                    	to Kafka but not all, only those messages that cannot be sent to Kafka
+                    	will be routed to this Relationship.
                     </li>
                 </ul>
             </li>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c91c7e78/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
index 2199a9c..10560f8 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestGetKafka.java
@@ -16,20 +16,27 @@
  */
 package org.apache.nifi.processors.kafka;
 
+import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
+import kafka.consumer.ConsumerIterator;
+import kafka.message.MessageAndMetadata;
+
 import org.apache.log4j.BasicConfigurator;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
-@Ignore("Intended only for local tests to verify functionality.")
 public class TestGetKafka {
 
-	public static final String ZOOKEEPER_CONNECTION = "192.168.0.101:2181";
 	
     @BeforeClass
     public static void configureLogging() {
@@ -39,9 +46,10 @@ public class TestGetKafka {
     }
     
     @Test
+    @Ignore("Intended only for local tests to verify functionality.")
     public void testIntegrationLocally() {
         final TestRunner runner = TestRunners.newTestRunner(GetKafka.class);
-        runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, ZOOKEEPER_CONNECTION);
+        runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "192.168.0.101:2181");
         runner.setProperty(GetKafka.TOPIC, "testX");
         runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
         runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
@@ -56,4 +64,99 @@ public class TestGetKafka {
         }
     }
     
+    
+    @Test
+    public void testWithDelimiter() {
+        final List<String> messages = new ArrayList<>();
+        messages.add("Hello");
+        messages.add("Good-bye");
+        
+        final TestableProcessor proc = new TestableProcessor(null, messages);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
+        runner.setProperty(GetKafka.TOPIC, "testX");
+        runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
+        runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
+        runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n");
+        runner.setProperty(GetKafka.BATCH_SIZE, "2");
+        
+        runner.run();
+        
+        runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1);
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0);
+        mff.assertContentEquals("Hello\nGood-bye");
+    }
+    
+    @Test
+    public void testWithDelimiterAndNotEnoughMessages() {
+        final List<String> messages = new ArrayList<>();
+        messages.add("Hello");
+        messages.add("Good-bye");
+        
+        final TestableProcessor proc = new TestableProcessor(null, messages);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(GetKafka.ZOOKEEPER_CONNECTION_STRING, "localhost:2181");
+        runner.setProperty(GetKafka.TOPIC, "testX");
+        runner.setProperty(GetKafka.KAFKA_TIMEOUT, "3 secs");
+        runner.setProperty(GetKafka.ZOOKEEPER_TIMEOUT, "3 secs");
+        runner.setProperty(GetKafka.MESSAGE_DEMARCATOR, "\\n");
+        runner.setProperty(GetKafka.BATCH_SIZE, "3");
+        
+        runner.run();
+        
+        runner.assertAllFlowFilesTransferred(GetKafka.REL_SUCCESS, 1);
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(GetKafka.REL_SUCCESS).get(0);
+        mff.assertContentEquals("Hello\nGood-bye");
+    }
+    
+    
+    private static class TestableProcessor extends GetKafka {
+        private final byte[] key;
+        private final Iterator<String> messageItr;
+        
+        public TestableProcessor(final byte[] key, final List<String> messages) {
+            this.key = key;
+            messageItr = messages.iterator();
+        }
+        
+        @Override
+        public void createConsumers(ProcessContext context) {
+        }
+        
+        @Override
+        @SuppressWarnings({"unchecked", "rawtypes"})
+        protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
+            final ConsumerIterator<byte[], byte[]> itr = Mockito.mock(ConsumerIterator.class);
+            
+            Mockito.doAnswer(new Answer<Boolean>() {
+                @Override
+                public Boolean answer(final InvocationOnMock invocation) throws Throwable {
+                    return messageItr.hasNext();
+                }
+            }).when(itr).hasNext();
+            
+            Mockito.doAnswer(new Answer<MessageAndMetadata>() {
+                @Override
+                public MessageAndMetadata answer(InvocationOnMock invocation) throws Throwable {
+                    final MessageAndMetadata mam = Mockito.mock(MessageAndMetadata.class);
+                    Mockito.when(mam.key()).thenReturn(key);
+                    Mockito.when(mam.offset()).thenReturn(0L);
+                    Mockito.when(mam.partition()).thenReturn(0);
+                    
+                    Mockito.doAnswer(new Answer<byte[]>() {
+                        @Override
+                        public byte[] answer(InvocationOnMock invocation) throws Throwable {
+                            return messageItr.next().getBytes();
+                        }
+                        
+                    }).when(mam).message();
+                    
+                    return mam;
+                }
+            }).when(itr).next();
+            
+            return itr;
+        }
+    }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c91c7e78/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
----------------------------------------------------------------------
diff --git a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
index 2e6aacf..cf7ed68 100644
--- a/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
+++ b/nar-bundles/kafka-bundle/kafka-processors/src/test/java/org/apache/nifi/processors/kafka/TestPutKafka.java
@@ -1,12 +1,22 @@
 package org.apache.nifi.processors.kafka;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import kafka.common.FailedToSendMessageException;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.annotation.OnScheduled;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -14,10 +24,109 @@ import org.junit.Ignore;
 import org.junit.Test;
 
 
-@Ignore("Intended only for local testing to verify functionality.")
 public class TestPutKafka {
 
+    @Test
+    public void testMultipleKeyValuePerFlowFile() {
+        final TestableProcessor proc = new TestableProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        
+        runner.enqueue("Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9".getBytes());
+        runner.run();
+        
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+        
+        final List<byte[]> messages = proc.getProducer().getMessages();
+        assertEquals(11, messages.size());
+        
+        assertTrue(Arrays.equals("Hello World".getBytes(StandardCharsets.UTF_8), messages.get(0)));
+        assertTrue(Arrays.equals("Goodbye".getBytes(StandardCharsets.UTF_8), messages.get(1)));
+        assertTrue(Arrays.equals("1".getBytes(StandardCharsets.UTF_8), messages.get(2)));
+        assertTrue(Arrays.equals("2".getBytes(StandardCharsets.UTF_8), messages.get(3)));
+        assertTrue(Arrays.equals("3".getBytes(StandardCharsets.UTF_8), messages.get(4)));
+        assertTrue(Arrays.equals("4".getBytes(StandardCharsets.UTF_8), messages.get(5)));
+        assertTrue(Arrays.equals("5".getBytes(StandardCharsets.UTF_8), messages.get(6)));
+        assertTrue(Arrays.equals("6".getBytes(StandardCharsets.UTF_8), messages.get(7)));
+        assertTrue(Arrays.equals("7".getBytes(StandardCharsets.UTF_8), messages.get(8)));
+        assertTrue(Arrays.equals("8".getBytes(StandardCharsets.UTF_8), messages.get(9)));
+        assertTrue(Arrays.equals("9".getBytes(StandardCharsets.UTF_8), messages.get(10)));
+    }
+    
+    
+    @Test
+    public void testWithImmediateFailure() {
+        final TestableProcessor proc = new TestableProcessor(0);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        
+        final String text = "Hello World\nGoodbye\n1\n2\n3\n4\n5\n6\n7\n8\n9";
+        runner.enqueue(text.getBytes());
+        runner.run();
+        
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_FAILURE, 1);
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
+        mff.assertContentEquals(text);
+    }
+    
+    
+    @Test
+    public void testPartialFailure() {
+        final TestableProcessor proc = new TestableProcessor(2);
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        runner.setProperty(PutKafka.MAX_BUFFER_SIZE, "1 B");
+        
+        final byte[] bytes = "1\n2\n3\n4".getBytes();
+        runner.enqueue(bytes);
+        runner.run();
+        
+        runner.assertTransferCount(PutKafka.REL_SUCCESS, 1);
+        runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
+
+        final MockFlowFile successFF = runner.getFlowFilesForRelationship(PutKafka.REL_SUCCESS).get(0);
+        successFF.assertContentEquals("1\n2\n");
+        
+        final MockFlowFile failureFF = runner.getFlowFilesForRelationship(PutKafka.REL_FAILURE).get(0);
+        failureFF.assertContentEquals("3\n4");
+    }
+    
+    
+    @Test
+    public void testWithEmptyMessages() {
+        final TestableProcessor proc = new TestableProcessor();
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setProperty(PutKafka.TOPIC, "topic1");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:1234");
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\\n");
+        
+        final byte[] bytes = "\n\n\n1\n2\n\n\n\n3\n4\n\n\n".getBytes();
+        runner.enqueue(bytes);
+        runner.run();
+        
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+
+        final List<byte[]> msgs = proc.getProducer().getMessages();
+        assertEquals(4, msgs.size());
+        assertTrue(Arrays.equals("1".getBytes(), msgs.get(0)));
+        assertTrue(Arrays.equals("2".getBytes(), msgs.get(1)));
+        assertTrue(Arrays.equals("3".getBytes(), msgs.get(2)));
+        assertTrue(Arrays.equals("4".getBytes(), msgs.get(3)));
+    }
+    
+    
 	@Test
+	@Ignore("Intended only for local testing; requires an actual running instance of Kafka & ZooKeeper...")
 	public void testKeyValuePut() {
 		final TestRunner runner = TestRunners.newTestRunner(PutKafka.class);
 		runner.setProperty(PutKafka.SEED_BROKERS, "192.168.0.101:9092");
@@ -45,4 +154,67 @@ public class TestPutKafka {
 		assertTrue(Arrays.equals(data, mff.toByteArray()));
 	}
 	
+	
+	private static class TestableProcessor extends PutKafka {
+	    private MockProducer producer;
+	    private int failAfter = Integer.MAX_VALUE;
+	    
+	    public TestableProcessor() {
+	    }
+	    
+	    public TestableProcessor(final int failAfter) {
+	        this.failAfter = failAfter;
+	    }
+	    
+	    @OnScheduled
+	    public void instantiateProducer(final ProcessContext context) {
+	        producer = new MockProducer(createConfig(context));
+	        producer.setFailAfter(failAfter);
+	    }
+	    
+	    @Override
+	    protected Producer<byte[], byte[]> createProducer(final ProcessContext context) {
+	        return producer;
+	    }
+	    
+	    public MockProducer getProducer() {
+	        return producer;
+	    }
+	}
+	
+	
+	private static class MockProducer extends Producer<byte[], byte[]> {
+	    private int sendCount = 0;
+	    private int failAfter = Integer.MAX_VALUE;
+	    
+	    private final List<byte[]> messages = new ArrayList<>();
+	    
+        public MockProducer(final ProducerConfig config) {
+            super(config);
+        }
+	    
+        @Override
+        public void send(final KeyedMessage<byte[], byte[]> message) {
+            if ( ++sendCount > failAfter ) {
+                throw new FailedToSendMessageException("Failed to send message", new RuntimeException("Unit test told to fail after " + failAfter + " successful messages"));
+            } else {
+                messages.add(message.message());
+            }
+        }
+        
+        public List<byte[]> getMessages() {
+            return messages;
+        }
+        
+        @Override
+        public void send(final List<KeyedMessage<byte[], byte[]>> messages) {
+            for ( final KeyedMessage<byte[], byte[]> msg : messages ) {
+                send(msg);
+            }
+        }
+        
+        public void setFailAfter(final int successCount) {
+            failAfter = successCount;
+        }
+	}
 }