You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/01/05 15:43:06 UTC

nifi git commit: NIFI-4724: Support 0 byte message with PublishKafka Before this fix, PublishKafka (0.9) and PublishKafka_0_10 fail with empty incoming FlowFiles due to 'transfer relationship not specified' error. Because the internal 'publish' method is

Repository: nifi
Updated Branches:
  refs/heads/master 3b10a8479 -> e5ed62a98


NIFI-4724: Support 0 byte message with PublishKafka
Before this fix, PublishKafka (0.9) and PublishKafka_0_10 fail with empty incoming FlowFiles due to 'transfer relationship not specified' error.
Because the internal 'publish' method is not called as StreamDemarcator does not emit any token regardless whether demarcator is set or not.

As for PublishKafka_0_11 and PublishKafka_1_0, empty FlowFiles are transferred to 'success' relationship, however no Kafka message is sent to Kafka.

Since Kafka allows 0 byte body empty messages, NiFi should be able to send it, too.

This commit changes above current situation to the followings, with all PublishKafka_* processors:

- If demarcator is not set, then publish incoming FlowFile content as it is. This enables sending an empty Kafka message.
- If demarcator is set, send each token as a separate message.
  Even if no token is found (empty incoming FlowFile), transfer the FlowFile to 'success'.

This closes #2362.

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e5ed62a9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e5ed62a9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e5ed62a9

Branch: refs/heads/master
Commit: e5ed62a98fe0c15fcba865edfe00eac673a6ac51
Parents: 3b10a84
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Dec 27 17:38:57 2017 +0900
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 5 10:42:58 2018 -0500

----------------------------------------------------------------------
 .../kafka/pubsub/InFlightMessageTracker.java    |  4 ++
 .../processors/kafka/pubsub/PublisherLease.java | 18 ++++++-
 .../kafka/pubsub/TestPublisherLease.java        | 55 ++++++++++++++++++-
 .../kafka/pubsub/InFlightMessageTracker.java    |  4 ++
 .../processors/kafka/pubsub/PublisherLease.java | 18 ++++++-
 .../kafka/pubsub/TestPublisherLease.java        | 55 ++++++++++++++++++-
 .../kafka/pubsub/InFlightMessageTracker.java    |  8 +++
 .../processors/kafka/pubsub/PublisherLease.java | 18 ++++++-
 .../kafka/pubsub/TestPublisherLease.java        | 56 +++++++++++++++++++-
 .../kafka/pubsub/InFlightMessageTracker.java    |  4 ++
 .../processors/kafka/pubsub/PublisherLease.java | 17 +++++-
 .../kafka/pubsub/TestPublisherLease.java        | 54 ++++++++++++++++++-
 12 files changed, 299 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e5ed62a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
index 58157d9..bdb419c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -42,6 +42,10 @@ public class InFlightMessageTracker {
         }
     }
 
+    /**
+     * This method guarantees that the specified FlowFile to be transferred to
+     * 'success' relationship even if it did not derive any Kafka message.
+     */
     public void trackEmpty(final FlowFile flowFile) {
         messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5ed62a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index c17c331..ea1c087 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -38,6 +38,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.exception.TokenTooLargeException;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
 
@@ -71,9 +72,21 @@ public class PublisherLease implements Closeable {
             tracker = new InFlightMessageTracker();
         }
 
-        try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
+        try {
             byte[] messageContent;
-            try {
+            if (demarcatorBytes == null || demarcatorBytes.length == 0) {
+                if (flowFile.getSize() > maxMessageSize) {
+                    tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes."));
+                    return;
+                }
+                // Send FlowFile content as it is, to support sending 0 byte message.
+                messageContent = new byte[(int) flowFile.getSize()];
+                StreamUtils.fillBuffer(flowFileContent, messageContent);
+                publish(flowFile, messageKey, messageContent, topic, tracker);
+                return;
+            }
+
+            try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
                 while ((messageContent = demarcator.nextToken()) != null) {
                     publish(flowFile, messageKey, messageContent, topic, tracker);
 
@@ -82,6 +95,7 @@ public class PublisherLease implements Closeable {
                         return;
                     }
                 }
+                tracker.trackEmpty(flowFile);
             } catch (final TokenTooLargeException ttle) {
                 tracker.fail(flowFile, ttle);
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5ed62a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 836a4b3..ac204f1 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -81,7 +81,9 @@ public class TestPublisherLease {
             }
         };
 
-        final FlowFile flowFile = new MockFlowFile(1L);
+        final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
+        // Need a size grater than zero to make the lease reads the InputStream.
+        Mockito.when(flowFile.getSize()).thenReturn(1L);
         final String topic = "unit-test";
         final byte[] messageKey = null;
         final byte[] demarcatorBytes = null;
@@ -206,6 +208,57 @@ public class TestPublisherLease {
     }
 
     @Test
+    @SuppressWarnings("unchecked")
+    public void testZeroByteMessageSent() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) {
+            @Override
+            protected void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final AtomicInteger correctMessages = new AtomicInteger(0);
+        final AtomicInteger incorrectMessages = new AtomicInteger(0);
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
+                final byte[] value = record.value();
+                final String valueString = new String(value, StandardCharsets.UTF_8);
+                if ("".equals(valueString)) {
+                    correctMessages.incrementAndGet();
+                } else {
+                    incorrectMessages.incrementAndGet();
+                }
+
+                return null;
+            }
+        }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = null;
+
+        final byte[] flowFileContent = new byte[0];
+        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
+
+        assertEquals(0, poisonCount.get());
+
+        verify(producer, times(0)).flush();
+
+        final PublishResult result = lease.complete();
+
+        assertEquals(1, correctMessages.get());
+        assertEquals(0, incorrectMessages.get());
+
+        verify(producer, times(1)).flush();
+    }
+
+    @Test
     public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {
         final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5ed62a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
index 317b274..3ec3d1c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -45,6 +45,10 @@ public class InFlightMessageTracker {
         }
     }
 
+    /**
+     * This method guarantees that the specified FlowFile to be transferred to
+     * 'success' relationship even if it did not derive any Kafka message.
+     */
     public void trackEmpty(final FlowFile flowFile) {
         messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5ed62a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index e8d744a..72c90d2 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -42,6 +42,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.exception.TokenTooLargeException;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
 
@@ -113,9 +114,21 @@ public class PublisherLease implements Closeable {
             tracker = new InFlightMessageTracker(logger);
         }
 
-        try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
+        try {
             byte[] messageContent;
-            try {
+            if (demarcatorBytes == null || demarcatorBytes.length == 0) {
+                if (flowFile.getSize() > maxMessageSize) {
+                    tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes."));
+                    return;
+                }
+                // Send FlowFile content as it is, to support sending 0 byte message.
+                messageContent = new byte[(int) flowFile.getSize()];
+                StreamUtils.fillBuffer(flowFileContent, messageContent);
+                publish(flowFile, messageKey, messageContent, topic, tracker);
+                return;
+            }
+
+            try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
                 while ((messageContent = demarcator.nextToken()) != null) {
                     publish(flowFile, messageKey, messageContent, topic, tracker);
 
@@ -123,6 +136,7 @@ public class PublisherLease implements Closeable {
                         // If we have a failure, don't try to send anything else.
                         return;
                     }
+                    tracker.trackEmpty(flowFile);
                 }
             } catch (final TokenTooLargeException ttle) {
                 tracker.fail(flowFile, ttle);

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5ed62a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 105a4d5..d2b52dd 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -80,7 +80,9 @@ public class TestPublisherLease {
             }
         };
 
-        final FlowFile flowFile = new MockFlowFile(1L);
+        final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
+        // Need a size grater than zero to make the lease reads the InputStream.
+        Mockito.when(flowFile.getSize()).thenReturn(1L);
         final String topic = "unit-test";
         final byte[] messageKey = null;
         final byte[] demarcatorBytes = null;
@@ -202,6 +204,57 @@ public class TestPublisherLease {
     }
 
     @Test
+    @SuppressWarnings("unchecked")
+    public void testZeroByteMessageSent() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) {
+            @Override
+            protected void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final AtomicInteger correctMessages = new AtomicInteger(0);
+        final AtomicInteger incorrectMessages = new AtomicInteger(0);
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
+                final byte[] value = record.value();
+                final String valueString = new String(value, StandardCharsets.UTF_8);
+                if ("".equals(valueString)) {
+                    correctMessages.incrementAndGet();
+                } else {
+                    incorrectMessages.incrementAndGet();
+                }
+
+                return null;
+            }
+        }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = null;
+
+        final byte[] flowFileContent = new byte[0];
+        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
+
+        assertEquals(0, poisonCount.get());
+
+        verify(producer, times(0)).flush();
+
+        final PublishResult result = lease.complete();
+
+        assertEquals(1, correctMessages.get());
+        assertEquals(0, incorrectMessages.get());
+
+        verify(producer, times(1)).flush();
+    }
+
+    @Test
     public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {
         final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5ed62a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
index e7d5cb7..bdb419c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -42,6 +42,14 @@ public class InFlightMessageTracker {
         }
     }
 
+    /**
+     * This method guarantees that the specified FlowFile to be transferred to
+     * 'success' relationship even if it did not derive any Kafka message.
+     */
+    public void trackEmpty(final FlowFile flowFile) {
+        messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
+    }
+
     public int getAcknowledgedCount(final FlowFile flowFile) {
         final Counts counter = messageCountsByFlowFile.get(flowFile);
         return (counter == null) ? 0 : counter.getAcknowledgedCount();

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5ed62a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index 8fb4e67..ca9537c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -29,6 +29,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.exception.TokenTooLargeException;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
 
@@ -61,9 +62,21 @@ public class PublisherLease implements Closeable {
             tracker = new InFlightMessageTracker();
         }
 
-        try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
+        try {
             byte[] messageContent;
-            try {
+            if (demarcatorBytes == null || demarcatorBytes.length == 0) {
+                if (flowFile.getSize() > maxMessageSize) {
+                    tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes."));
+                    return;
+                }
+                // Send FlowFile content as it is, to support sending 0 byte message.
+                messageContent = new byte[(int) flowFile.getSize()];
+                StreamUtils.fillBuffer(flowFileContent, messageContent);
+                publish(flowFile, messageKey, messageContent, topic, tracker);
+                return;
+            }
+
+            try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
                 while ((messageContent = demarcator.nextToken()) != null) {
                     publish(flowFile, messageKey, messageContent, topic, tracker);
 
@@ -72,6 +85,7 @@ public class PublisherLease implements Closeable {
                         return;
                     }
                 }
+                tracker.trackEmpty(flowFile);
             } catch (final TokenTooLargeException ttle) {
                 tracker.fail(flowFile, ttle);
             }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5ed62a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index c2d143c..b67cefe 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -68,7 +68,9 @@ public class TestPublisherLease {
             }
         };
 
-        final FlowFile flowFile = new MockFlowFile(1L);
+        final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
+        // Need a size grater than zero to make the lease reads the InputStream.
+        Mockito.when(flowFile.getSize()).thenReturn(1L);
         final String topic = "unit-test";
         final byte[] messageKey = null;
         final byte[] demarcatorBytes = null;
@@ -191,4 +193,56 @@ public class TestPublisherLease {
 
         verify(producer, times(1)).flush();
     }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testZeroByteMessageSent() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger) {
+            @Override
+            protected void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final AtomicInteger correctMessages = new AtomicInteger(0);
+        final AtomicInteger incorrectMessages = new AtomicInteger(0);
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
+                final byte[] value = record.value();
+                final String valueString = new String(value, StandardCharsets.UTF_8);
+                if ("".equals(valueString)) {
+                    correctMessages.incrementAndGet();
+                } else {
+                    incorrectMessages.incrementAndGet();
+                }
+
+                return null;
+            }
+        }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = null;
+
+        final byte[] flowFileContent = new byte[0];
+        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
+
+        assertEquals(0, poisonCount.get());
+
+        verify(producer, times(0)).flush();
+
+        final PublishResult result = lease.complete();
+
+        assertEquals(1, correctMessages.get());
+        assertEquals(0, incorrectMessages.get());
+
+        verify(producer, times(1)).flush();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5ed62a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
index 317b274..3ec3d1c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -45,6 +45,10 @@ public class InFlightMessageTracker {
         }
     }
 
+    /**
+     * This method guarantees that the specified FlowFile to be transferred to
+     * 'success' relationship even if it did not derive any Kafka message.
+     */
     public void trackEmpty(final FlowFile flowFile) {
         messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5ed62a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index e8d744a..2b1cfe2 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -42,6 +42,7 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.stream.io.exception.TokenTooLargeException;
 import org.apache.nifi.stream.io.util.StreamDemarcator;
 
@@ -113,9 +114,21 @@ public class PublisherLease implements Closeable {
             tracker = new InFlightMessageTracker(logger);
         }
 
-        try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
+        try {
             byte[] messageContent;
-            try {
+            if (demarcatorBytes == null || demarcatorBytes.length == 0) {
+                if (flowFile.getSize() > maxMessageSize) {
+                    tracker.fail(flowFile, new TokenTooLargeException("A message in the stream exceeds the maximum allowed message size of " + maxMessageSize + " bytes."));
+                    return;
+                }
+                // Send FlowFile content as it is, to support sending 0 byte message.
+                messageContent = new byte[(int) flowFile.getSize()];
+                StreamUtils.fillBuffer(flowFileContent, messageContent);
+                publish(flowFile, messageKey, messageContent, topic, tracker);
+                return;
+            }
+
+            try (final StreamDemarcator demarcator = new StreamDemarcator(flowFileContent, demarcatorBytes, maxMessageSize)) {
                 while ((messageContent = demarcator.nextToken()) != null) {
                     publish(flowFile, messageKey, messageContent, topic, tracker);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e5ed62a9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
index 64451d5..b2e1b0e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublisherLease.java
@@ -79,7 +79,9 @@ public class TestPublisherLease {
             }
         };
 
-        final FlowFile flowFile = new MockFlowFile(1L);
+        final FlowFile flowFile = Mockito.spy(new MockFlowFile(1L));
+        // Need a size grater than zero to make the lease reads the InputStream.
+        Mockito.when(flowFile.getSize()).thenReturn(1L);
         final String topic = "unit-test";
         final byte[] messageKey = null;
         final byte[] demarcatorBytes = null;
@@ -200,6 +202,56 @@ public class TestPublisherLease {
         verify(producer, times(1)).flush();
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testZeroByteMessageSent() throws IOException {
+        final AtomicInteger poisonCount = new AtomicInteger(0);
+
+        final PublisherLease lease = new PublisherLease(producer, 1024 * 1024, 10L, logger, true, null, StandardCharsets.UTF_8) {
+            @Override
+            protected void poison() {
+                poisonCount.incrementAndGet();
+                super.poison();
+            }
+        };
+
+        final AtomicInteger correctMessages = new AtomicInteger(0);
+        final AtomicInteger incorrectMessages = new AtomicInteger(0);
+        doAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                final ProducerRecord<byte[], byte[]> record = invocation.getArgumentAt(0, ProducerRecord.class);
+                final byte[] value = record.value();
+                final String valueString = new String(value, StandardCharsets.UTF_8);
+                if ("".equals(valueString)) {
+                    correctMessages.incrementAndGet();
+                } else {
+                    incorrectMessages.incrementAndGet();
+                }
+
+                return null;
+            }
+        }).when(producer).send(any(ProducerRecord.class), any(Callback.class));
+
+        final FlowFile flowFile = new MockFlowFile(1L);
+        final String topic = "unit-test";
+        final byte[] messageKey = null;
+        final byte[] demarcatorBytes = null;
+
+        final byte[] flowFileContent = new byte[0];
+        lease.publish(flowFile, new ByteArrayInputStream(flowFileContent), messageKey, demarcatorBytes, topic);
+
+        assertEquals(0, poisonCount.get());
+
+        verify(producer, times(0)).flush();
+
+        final PublishResult result = lease.complete();
+
+        assertEquals(1, correctMessages.get());
+        assertEquals(0, incorrectMessages.get());
+
+        verify(producer, times(1)).flush();
+    }
 
     @Test
     public void testRecordsSentToRecordWriterAndThenToProducer() throws IOException, SchemaNotFoundException, MalformedRecordException {