You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/04/01 20:42:40 UTC

[nifi] 09/18: NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch NIFI-6169-RC1
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit e5ddae54efe229a2eb033a694b6c82c3ebf62018
Author: Koji Kawamura <ij...@apache.org>
AuthorDate: Wed Mar 20 12:16:16 2019 +0900

    NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on
    
    Added an unit test representing the fixed issue.
    And updated existing testDefragment test to illustrate
    the remaining FlowFiles those did not meet the threshold.
---
 .../nifi/processors/standard/TestMergeRecord.java  | 63 ++++++++++++++++++++--
 1 file changed, 59 insertions(+), 4 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index c54bf2a..3540b04 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -158,27 +158,39 @@ public class TestMergeRecord {
         final Map<String, String> attr1 = new HashMap<>();
         attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
         attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+        attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
 
         final Map<String, String> attr2 = new HashMap<>();
         attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
         attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+        attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
 
         final Map<String, String> attr3 = new HashMap<>();
         attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
         attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
+        attr3.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
 
         final Map<String, String> attr4 = new HashMap<>();
         attr4.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
-        attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
+        attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "3");
+        attr4.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
+
+        final Map<String, String> attr5 = new HashMap<>();
+        attr5.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr5.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "3");
+        attr5.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
 
         runner.enqueue("Name, Age\nJohn, 35", attr1);
         runner.enqueue("Name, Age\nJane, 34", attr2);
 
-        runner.enqueue("Name, Age\nJake, 3", attr3);
-        runner.enqueue("Name, Age\nJan, 2", attr4);
+        runner.enqueue("Name, Age\nJay, 24", attr3);
+
+        runner.enqueue("Name, Age\nJake, 3", attr4);
+        runner.enqueue("Name, Age\nJan, 2", attr5);
 
-        runner.run(4);
+        runner.run(1);
 
+        assertEquals("Fragment id=2 should remain in the incoming connection", 1, runner.getQueueSize().getObjectCount());
         runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
 
@@ -196,6 +208,49 @@ public class TestMergeRecord {
 
 
     @Test
+    public void testDefragmentWithMultipleRecords() {
+        runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
+
+        final Map<String, String> attr1 = new HashMap<>();
+        attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+        attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
+        attr1.put("record.count", "2");
+
+        final Map<String, String> attr2 = new HashMap<>();
+        attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+        attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
+        attr2.put("record.count", "2");
+
+        final Map<String, String> attr3 = new HashMap<>();
+        attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
+        attr3.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
+        attr3.put("record.count", "2");
+
+        runner.enqueue("Name, Age\nJohn, 35\nJane, 34", attr1);
+
+        runner.enqueue("Name, Age\nJake, 3\nJan, 2", attr2);
+
+        runner.enqueue("Name, Age\nJay, 24\nJade, 28", attr3);
+
+        runner.run(1);
+
+        assertEquals("Fragment id=2 should remain in the incoming connection", 1, runner.getQueueSize().getObjectCount());
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
+
+        final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
+        assertEquals(1L, mffs.stream()
+            .filter(ff -> "4".equals(ff.getAttribute("record.count")))
+            .filter(ff -> "header\nJohn,35\nJane,34\nJake,3\nJan,2\n".equals(new String(ff.toByteArray())))
+            .count());
+
+    }
+
+
+    @Test
     public void testMinSize() {
         runner.setProperty(MergeRecord.MIN_RECORDS, "2");
         runner.setProperty(MergeRecord.MAX_RECORDS, "2");