You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/16 18:43:18 UTC

nifi git commit: NIFI-3909: This closes #1806. If we have a FlowFile with 0 records, ensure that PublishKafkaRecord_0_10 handles the flowfile properly

Repository: nifi
Updated Branches:
  refs/heads/master f5f6cab64 -> fb94990e6


NIFI-3909: This closes #1806. If we have a FlowFile with 0 records, ensure that PublishKafkaRecord_0_10 handles the flowfile properly

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fb94990e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fb94990e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fb94990e

Branch: refs/heads/master
Commit: fb94990e60068eb244cae1d12d3a0790ece29adc
Parents: f5f6cab
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue May 16 10:49:17 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue May 16 14:42:42 2017 -0400

----------------------------------------------------------------------
 .../kafka/pubsub/InFlightMessageTracker.java    |  4 ++++
 .../processors/kafka/pubsub/PublisherLease.java |  6 +++++
 .../pubsub/TestPublishKafkaRecord_0_10.java     | 24 ++++++++++++++++++++
 3 files changed, 34 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/fb94990e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
index e7d5cb7..58157d9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -42,6 +42,10 @@ public class InFlightMessageTracker {
         }
     }
 
+    public void trackEmpty(final FlowFile flowFile) {
+        messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
+    }
+
     public int getAcknowledgedCount(final FlowFile flowFile) {
         final Counts counter = messageCountsByFlowFile.get(flowFile);
         return (counter == null) ? 0 : counter.getAcknowledgedCount();

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb94990e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index f08f7a9..be2697b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -102,9 +102,11 @@ public class PublisherLease implements Closeable {
 
         Record record;
         final RecordSet recordSet = reader.createRecordSet();
+        int recordCount = 0;
 
         try {
             while ((record = recordSet.next()) != null) {
+                recordCount++;
                 baos.reset();
                 writer.write(record, baos);
 
@@ -119,6 +121,10 @@ public class PublisherLease implements Closeable {
                     return;
                 }
             }
+
+            if (recordCount == 0) {
+                tracker.trackEmpty(flowFile);
+            }
         } catch (final TokenTooLargeException ttle) {
             tracker.fail(flowFile, ttle);
         } catch (final Exception e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/fb94990e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
index c1df792..8c6efb7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
@@ -191,6 +191,30 @@ public class TestPublishKafkaRecord_0_10 {
             .count());
     }
 
+    @Test
+    public void testNoRecordsInFlowFile() throws IOException {
+        final List<FlowFile> flowFiles = new ArrayList<>();
+        flowFiles.add(runner.enqueue(new byte[0]));
+
+        final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+        msgCounts.put(flowFiles.get(0), 0);
+
+        final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
+
+        when(mockLease.complete()).thenReturn(result);
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
+
+        verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+        verify(mockLease, times(1)).complete();
+        verify(mockLease, times(0)).poison();
+        verify(mockLease, times(1)).close();
+
+        final MockFlowFile mff = runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).get(0);
+        mff.assertAttributeEquals("msg.count", "0");
+    }
+
 
     @Test
     public void testSomeSuccessSomeFailure() throws IOException {