You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/04/03 20:12:24 UTC

[nifi] 06/17: NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch NIFI-6169-RC2
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 11a9f10f5b67c53d8ab66dbfd04ffdcba681ead7
Author: Andres Garagiola <an...@gmail.com>
AuthorDate: Fri Feb 22 17:15:59 2019 -0300

    NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on
    
    This closes #3334.
    
    Signed-off-by: Andres Garagiola <an...@gmail.com>
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../nifi/processors/standard/merge/RecordBin.java  | 55 +++++++++-------------
 .../standard/merge/RecordBinManager.java           | 10 ++--
 .../standard/merge/RecordBinThresholds.java        | 10 ++--
 3 files changed, 31 insertions(+), 44 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index d15ba0f..a96e119 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -63,6 +63,7 @@ public class RecordBin {
     private RecordSetWriter recordWriter;
     private ByteCountingOutputStream out;
     private int recordCount = 0;
+    private int fragmentCount = 0;
     private volatile boolean complete = false;
 
     private static final AtomicLong idGenerator = new AtomicLong(0L);
@@ -114,7 +115,7 @@ public class RecordBin {
         }
 
         boolean flowFileMigrated = false;
-
+        this.fragmentCount++;
         try {
             if (isComplete()) {
                 logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
@@ -202,26 +203,7 @@ public class RecordBin {
                 return false;
             }
 
-            int maxRecords;
-            final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
-            if (recordCountAttribute.isPresent()) {
-                final Optional<String> recordCountValue = flowFiles.stream()
-                    .filter(ff -> ff.getAttribute(recordCountAttribute.get()) != null)
-                    .map(ff -> ff.getAttribute(recordCountAttribute.get()))
-                    .findFirst();
-
-                if (!recordCountValue.isPresent()) {
-                    return false;
-                }
-
-                try {
-                    maxRecords = Integer.parseInt(recordCountValue.get());
-                } catch (final NumberFormatException e) {
-                    maxRecords = 1;
-                }
-            } else {
-                maxRecords = thresholds.getMaxRecords();
-            }
+            int maxRecords = thresholds.getMaxRecords();
 
             if (recordCount >= maxRecords) {
                 return true;
@@ -231,6 +213,22 @@ public class RecordBin {
                 return true;
             }
 
+            Optional<String> fragmentCountAttribute = thresholds.getFragmentCountAttribute();
+            if(fragmentCountAttribute != null && fragmentCountAttribute.isPresent()) {
+                final Optional<String> fragmentCountValue = flowFiles.stream()
+                        .filter(ff -> ff.getAttribute(fragmentCountAttribute.get()) != null)
+                        .map(ff -> ff.getAttribute(fragmentCountAttribute.get()))
+                        .findFirst();
+                if (fragmentCountValue.isPresent()) {
+                    try {
+                        int expectedFragments = Integer.parseInt(fragmentCountValue.get());
+                        if (this.fragmentCount == expectedFragments)
+                            return true;
+                    } catch (NumberFormatException nfe) {
+                        this.logger.error(nfe.getMessage(), nfe);
+                    }
+                }
+            }
             return false;
         } finally {
             readLock.unlock();
@@ -243,18 +241,7 @@ public class RecordBin {
             return currentCount;
         }
 
-        int requiredCount;
-        final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute();
-        if (recordCountAttribute.isPresent()) {
-            final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get());
-            try {
-                requiredCount = Integer.parseInt(recordCountValue);
-            } catch (final NumberFormatException e) {
-                requiredCount = 1;
-            }
-        } else {
-            requiredCount = thresholds.getMinRecords();
-        }
+        int requiredCount = thresholds.getMinRecords();
 
         this.requiredRecordCount = requiredCount;
         return requiredCount;
@@ -347,7 +334,7 @@ public class RecordBin {
             }
 
             // If using defragment mode, and we don't have enough FlowFiles, then we need to fail this bin.
-            final Optional<String> countAttr = thresholds.getRecordCountAttribute();
+            final Optional<String> countAttr = thresholds.getFragmentCountAttribute();
             if (countAttr.isPresent()) {
                 // Ensure that at least one FlowFile has a fragment.count attribute and that they all have the same value, if they have a value.
                 Integer expectedBinCount = null;
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index d1dde2a..f0891a9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -189,17 +189,17 @@ public class RecordBinManager {
 
         final PropertyValue maxMillisValue = context.getProperty(MergeRecord.MAX_BIN_AGE);
         final String maxBinAge = maxMillisValue.getValue();
-        final long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS).longValue() : Long.MAX_VALUE;
+        final long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS) : Long.MAX_VALUE;
 
-        final String recordCountAttribute;
+        final String fragmentCountAttribute;
         final String mergeStrategy = context.getProperty(MergeRecord.MERGE_STRATEGY).getValue();
         if (MergeRecord.MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            recordCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
+            fragmentCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE;
         } else {
-            recordCountAttribute = null;
+            fragmentCountAttribute = null;
         }
 
-        return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, recordCountAttribute);
+        return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, fragmentCountAttribute);
     }
 
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
index 16c05ac..8f1a5c8 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java
@@ -26,17 +26,17 @@ public class RecordBinThresholds {
     private final long maxBytes;
     private final long maxBinMillis;
     private final String maxBinAge;
-    private final Optional<String> recordCountAttribute;
+    private final Optional<String> fragmentCountAttribute;
 
     public RecordBinThresholds(final int minRecords, final int maxRecords, final long minBytes, final long maxBytes, final long maxBinMillis,
-        final String maxBinAge, final String recordCountAttribute) {
+        final String maxBinAge, final String fragmentCountAttribute) {
         this.minRecords = minRecords;
         this.maxRecords = maxRecords;
         this.minBytes = minBytes;
         this.maxBytes = maxBytes;
         this.maxBinMillis = maxBinMillis;
         this.maxBinAge = maxBinAge;
-        this.recordCountAttribute = Optional.ofNullable(recordCountAttribute);
+        this.fragmentCountAttribute = Optional.ofNullable(fragmentCountAttribute);
     }
 
     public int getMinRecords() {
@@ -63,7 +63,7 @@ public class RecordBinThresholds {
         return maxBinAge;
     }
 
-    public Optional<String> getRecordCountAttribute() {
-        return recordCountAttribute;
+    public Optional<String> getFragmentCountAttribute() {
+        return fragmentCountAttribute;
     }
 }