You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/03/20 03:38:22 UTC

[nifi] branch master updated (9d21a10 -> cd35678)

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from 9d21a10  NIFI-6120 Move nifi.registry.version property to root pom so it can be shared between nifi-framework and nifi-toolkit
     new b97fbd2  NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on
     new cd35678  NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../nifi/processors/standard/merge/RecordBin.java  | 55 ++++++++-----------
 .../standard/merge/RecordBinManager.java           | 10 ++--
 .../standard/merge/RecordBinThresholds.java        | 10 ++--
 .../nifi/processors/standard/TestMergeRecord.java  | 63 ++++++++++++++++++++--
 4 files changed, 90 insertions(+), 48 deletions(-)


[nifi] 02/02: NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit cd3567873be7586aa285bd6b27e928e320299bee
Author: Koji Kawamura <ij...@apache.org>
AuthorDate: Wed Mar 20 12:16:16 2019 +0900

    NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on
    
    Added an unit test representing the fixed issue.
    And updated existing testDefragment test to illustrate
    the remaining FlowFiles those did not meet the threshold.
---
 .../nifi/processors/standard/TestMergeRecord.java  | 63 ++++++++++++++++++++--
 1 file changed, 59 insertions(+), 4 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index c54bf2a..3540b04 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -158,27 +158,39 @@ public class TestMergeRecord {
         final Map<String, String> attr1 = new HashMap<>();
         attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
         attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+        attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
 
         final Map<String, String> attr2 = new HashMap<>();
         attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
         attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+        attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
 
         final Map<String, String> attr3 = new HashMap<>();
         attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
         attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
+        attr3.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
 
         final Map<String, String> attr4 = new HashMap<>();
         attr4.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
-        attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
+        attr4.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "3");
+        attr4.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
+
+        final Map<String, String> attr5 = new HashMap<>();
+        attr5.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr5.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "3");
+        attr5.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
 
         runner.enqueue("Name, Age\nJohn, 35", attr1);
         runner.enqueue("Name, Age\nJane, 34", attr2);
 
-        runner.enqueue("Name, Age\nJake, 3", attr3);
-        runner.enqueue("Name, Age\nJan, 2", attr4);
+        runner.enqueue("Name, Age\nJay, 24", attr3);
+
+        runner.enqueue("Name, Age\nJake, 3", attr4);
+        runner.enqueue("Name, Age\nJan, 2", attr5);
 
-        runner.run(4);
+        runner.run(1);
 
+        assertEquals("Fragment id=2 should remain in the incoming connection", 1, runner.getQueueSize().getObjectCount());
         runner.assertTransferCount(MergeRecord.REL_MERGED, 2);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
 
@@ -196,6 +208,49 @@ public class TestMergeRecord {
 
 
     @Test
+    public void testDefragmentWithMultipleRecords() {
+        runner.setProperty(MergeRecord.MERGE_STRATEGY, MergeRecord.MERGE_STRATEGY_DEFRAGMENT);
+
+        final Map<String, String> attr1 = new HashMap<>();
+        attr1.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr1.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+        attr1.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
+        attr1.put("record.count", "2");
+
+        final Map<String, String> attr2 = new HashMap<>();
+        attr2.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr2.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "1");
+        attr2.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "1");
+        attr2.put("record.count", "2");
+
+        final Map<String, String> attr3 = new HashMap<>();
+        attr3.put(MergeRecord.FRAGMENT_COUNT_ATTRIBUTE, "2");
+        attr3.put(MergeRecord.FRAGMENT_ID_ATTRIBUTE, "2");
+        attr3.put(MergeRecord.FRAGMENT_INDEX_ATTRIBUTE, "0");
+        attr3.put("record.count", "2");
+
+        runner.enqueue("Name, Age\nJohn, 35\nJane, 34", attr1);
+
+        runner.enqueue("Name, Age\nJake, 3\nJan, 2", attr2);
+
+        runner.enqueue("Name, Age\nJay, 24\nJade, 28", attr3);
+
+        runner.run(1);
+
+        assertEquals("Fragment id=2 should remain in the incoming connection", 1, runner.getQueueSize().getObjectCount());
+        runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
+        runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 2);
+
+        final List<MockFlowFile> mffs = runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED);
+        assertEquals(1L, mffs.stream()
+            .filter(ff -> "4".equals(ff.getAttribute("record.count")))
+            .filter(ff -> "header\nJohn,35\nJane,34\nJake,3\nJan,2\n".equals(new String(ff.toByteArray())))
+            .count());
+
+    }
+
+
+    @Test
     public void testMinSize() {
         runner.setProperty(MergeRecord.MIN_RECORDS, "2");
         runner.setProperty(MergeRecord.MAX_RECORDS, "2");


[nifi] 01/02: NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on

Posted by ij...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit b97fbd2c89caafec4bde485c6118a6daa087fcb2
Author: Andres Garagiola <an...@gmail.com>
AuthorDate: Fri Feb 22 17:15:59 2019 -0300

    NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on
    
    This closes #3334.
    
    Signed-off-by: Andres Garagiola <an...@gmail.com>
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../nifi/processors/standard/merge/RecordBin.java  | 55 +++++++++-------------
 .../standard/merge/RecordBinManager.java           | 10 ++--
 .../standard/merge/RecordBinThresholds.java        | 10 ++--
 3 files changed, 31 insertions(+), 44 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index d15ba0f..a96e119 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -63,6 +63,7 @@ public class RecordBin {
     private RecordSetWriter recordWriter;
     private ByteCountingOutputStream out;
     private int recordCount = 0;
+    private int fragmentCount = 0;
     private volatile boolean complete = false;
 
     private static final AtomicLong idGenerator = new AtomicLong(0L);
@@ -114,7 +115,7 @@ public class RecordBin {
         }
 
         boolean flowFileMigrated = false;
-
+        this.fragmentCount++;
         try {
             if (isComplete()) {
                 logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
@@ -202,26 +203,7 @@ public class RecordBin {
                 return false;
             }
 
-            int maxRecords;
-            final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
-            if (recordCountAttribute.isPresent()) {
-                final Optional<String> recordCountValue = flowFiles.stream()
-                    .filter(ff -> ff.getAttribute(recordCountAttribute.get()) != null)
-                    .map(ff -> ff.getAttribute(recordCountAttribute.get()))
-                    .findFirst();
-
-                if (!recordCountValue.isPresent()) {
-                    return false;
-                }
-
-                try {
-                    maxRecords = Integer.parseInt(recordCountValue.get());
-                } catch (final NumberFormatException e) {
-                    maxRecords = 1;
-                }
-            } else {
-                maxRecords = thresholds.getMaxRecords();
-            }
+            int maxRecords = thresholds.getMaxRecords();
 
             if (recordCount >= maxRecords) {
                 return true;
@@ -231,6 +213,22 @@ public class RecordBin {
                 return true;
             }
 
+            Optional<String> fragmentCountAttribute = thresholds.getFragmentCountAttribute();
+            if(fragmentCountAttribute != null && fragmentCountAttribute.isPresent()) {
+                final Optional<String> fragmentCountValue = flowFiles.stream()
+                        .filter(ff -> ff.getAttribute(fragmentCountAttribute.get()) != null)
+                        .map(ff -> ff.getAttribute(fragmentCountAttribute.get()))
+                        .findFirst();
+                if (fragmentCountValue.isPresent()) {
+                    try {
+                        int expectedFragments = Integer.parseInt(fragmentCountValue.get());
+                        if (this.fragmentCount == expectedFragments)
+                            return true;
+                    } catch (NumberFormatException nfe) {
+                        this.logger.error(nfe.getMessage(), nfe);
+                    }
+                }
+            }
             return false;
         } finally {
             readLock.unlock();
@@ -243,18 +241,7 @@ public class RecordBin {
             return currentCount;
         }
 
-        int requiredCount;
-        final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
-        if (recordCountAttribute.isPresent()) {
-            final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get());
-            try {
-                requiredCount = Integer.parseInt(recordCountValue);
-            } catch (final NumberFormatException e) {
-                requiredCount = 1;
-            }
-        } else {
-            requiredCount = thresholds.getMinRecords();
-        }
+        int requiredCount = thresholds.getMinRecords();
 
         this.requiredRecordCount = requiredCount;
         return requiredCount;
@@ -347,7 +334,7 @@ public class RecordBin {
             }
 
             // If using defragment mode, and we don't have enough FlowFiles, then we need to fail this bin.
-            final Optional<String> countAttr = thresholds.getRecordCountAttribute();
+            final Optional<String> countAttr = thresholds.getFragmentCountAttribute();
             if (countAttr.isPresent()) {
                 // Ensure that at least one FlowFile has a fragment.count attribute and that they all have the same value, if they have a value.
                 Integer expectedBinCount = null;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index d1dde2a..f0891a9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -189,17 +189,17 @@ public class RecordBinManager {
 
         final PropertyValue maxMillisValue = context.getProperty(MergeRecord.MAX_BIN_AGE);
         final String maxBinAge = maxMillisValue.getValue();
-        final long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS).longValue() : Long.MAX_VALUE;
+        final long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS) : Long.MAX_VALUE;
 
-        final String recordCountAttribute;
+        final String fragmentCountAttribute;
         final String mergeStrategy = context.getProperty(MergeRecord.MERGE_STRATEGY).getValue();
         if (MergeRecord.MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            recordCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
+            fragmentCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
         } else {
-            recordCountAttribute = null;
+            fragmentCountAttribute = null;
         }
 
-        return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, recordCountAttribute);
+        return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, fragmentCountAttribute);
     }
 
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
index 16c05ac..8f1a5c8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
@@ -26,17 +26,17 @@ public class RecordBinThresholds {
     private final long maxBytes;
     private final long maxBinMillis;
     private final String maxBinAge;
-    private final Optional<String> recordCountAttribute;
+    private final Optional<String> fragmentCountAttribute;
 
     public RecordBinThresholds(final int minRecords, final int maxRecords, final long minBytes, final long maxBytes, final long maxBinMillis,
-        final String maxBinAge, final String recordCountAttribute) {
+        final String maxBinAge, final String fragmentCountAttribute) {
         this.minRecords = minRecords;
         this.maxRecords = maxRecords;
         this.minBytes = minBytes;
         this.maxBytes = maxBytes;
         this.maxBinMillis = maxBinMillis;
         this.maxBinAge = maxBinAge;
-        this.recordCountAttribute = Optional.ofNullable(recordCountAttribute);
+        this.fragmentCountAttribute = Optional.ofNullable(fragmentCountAttribute);
     }
 
     public int getMinRecords() {
@@ -63,7 +63,7 @@ public class RecordBinThresholds {
         return maxBinAge;
     }
 
-    public Optional<String> getRecordCountAttribute() {
-        return recordCountAttribute;
+    public Optional<String> getFragmentCountAttribute() {
+        return fragmentCountAttribute;
     }
 }