You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/16 18:43:18 UTC
nifi git commit: NIFI-3909: This closes #1806. If we have a FlowFile
with 0 records,
ensure that PublishKafkaRecord_0_10 handles the flowfile properly
Repository: nifi
Updated Branches:
refs/heads/master f5f6cab64 -> fb94990e6
NIFI-3909: This closes #1806. If we have a FlowFile with 0 records, ensure that PublishKafkaRecord_0_10 handles the flowfile properly
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/fb94990e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/fb94990e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/fb94990e
Branch: refs/heads/master
Commit: fb94990e60068eb244cae1d12d3a0790ece29adc
Parents: f5f6cab
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue May 16 10:49:17 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue May 16 14:42:42 2017 -0400
----------------------------------------------------------------------
.../kafka/pubsub/InFlightMessageTracker.java | 4 ++++
.../processors/kafka/pubsub/PublisherLease.java | 6 +++++
.../pubsub/TestPublishKafkaRecord_0_10.java | 24 ++++++++++++++++++++
3 files changed, 34 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb94990e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
index e7d5cb7..58157d9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/InFlightMessageTracker.java
@@ -42,6 +42,10 @@ public class InFlightMessageTracker {
}
}
+ public void trackEmpty(final FlowFile flowFile) {
+ messageCountsByFlowFile.putIfAbsent(flowFile, new Counts());
+ }
+
public int getAcknowledgedCount(final FlowFile flowFile) {
final Counts counter = messageCountsByFlowFile.get(flowFile);
return (counter == null) ? 0 : counter.getAcknowledgedCount();
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb94990e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
index f08f7a9..be2697b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublisherLease.java
@@ -102,9 +102,11 @@ public class PublisherLease implements Closeable {
Record record;
final RecordSet recordSet = reader.createRecordSet();
+ int recordCount = 0;
try {
while ((record = recordSet.next()) != null) {
+ recordCount++;
baos.reset();
writer.write(record, baos);
@@ -119,6 +121,10 @@ public class PublisherLease implements Closeable {
return;
}
}
+
+ if (recordCount == 0) {
+ tracker.trackEmpty(flowFile);
+ }
} catch (final TokenTooLargeException ttle) {
tracker.fail(flowFile, ttle);
} catch (final Exception e) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/fb94990e/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
index c1df792..8c6efb7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestPublishKafkaRecord_0_10.java
@@ -191,6 +191,30 @@ public class TestPublishKafkaRecord_0_10 {
.count());
}
+ @Test
+ public void testNoRecordsInFlowFile() throws IOException {
+ final List<FlowFile> flowFiles = new ArrayList<>();
+ flowFiles.add(runner.enqueue(new byte[0]));
+
+ final Map<FlowFile, Integer> msgCounts = new HashMap<>();
+ msgCounts.put(flowFiles.get(0), 0);
+
+ final PublishResult result = createPublishResult(msgCounts, new HashSet<>(flowFiles), Collections.emptyMap());
+
+ when(mockLease.complete()).thenReturn(result);
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(PublishKafkaRecord_0_10.REL_SUCCESS, 1);
+
+ verify(mockLease, times(1)).publish(any(FlowFile.class), any(RecordReader.class), any(RecordWriter.class), eq(null), eq(TOPIC_NAME));
+ verify(mockLease, times(1)).complete();
+ verify(mockLease, times(0)).poison();
+ verify(mockLease, times(1)).close();
+
+ final MockFlowFile mff = runner.getFlowFilesForRelationship(PublishKafkaRecord_0_10.REL_SUCCESS).get(0);
+ mff.assertAttributeEquals("msg.count", "0");
+ }
+
@Test
public void testSomeSuccessSomeFailure() throws IOException {