You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/09/24 20:36:15 UTC

nifi git commit: NIFI-5514: Fixed bugs in MergeRecord around minimum thresholds not being honored and validation not being performed to ensure that minimum threshold is smaller than max threshold (would previously allow min record = 100, max records = 2

Repository: nifi
Updated Branches:
  refs/heads/master f5e9ea680 -> 2a964681e


NIFI-5514: Fixed bugs in MergeRecord around minimum thresholds not being honored and validation not being performed to ensure that minimum threshold is smaller than max threshold (would previously allow min record = 100, max records = 2 as a valid configuration)
NIFI-5514: Do not rely on ProcessSession.getQueueSize() to return a queue size of 0 objects because if the processor is holding onto data, the queue size won't be 0.

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #2954.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2a964681
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2a964681
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2a964681

Branch: refs/heads/master
Commit: 2a964681eca443cc335b6f269d2b9ddab57250c7
Parents: f5e9ea6
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Aug 16 14:08:43 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Mon Sep 24 22:36:02 2018 +0200

----------------------------------------------------------------------
 .../nifi/processors/standard/MergeRecord.java   | 68 ++++++++++++++++----
 .../processors/standard/merge/RecordBin.java    | 31 ++++-----
 .../standard/merge/RecordBinManager.java        | 44 ++++++++-----
 .../processors/standard/TestMergeRecord.java    | 31 ++++++---
 4 files changed, 121 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/2a964681/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index 56227f2..ecebb8e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -17,17 +17,6 @@
 
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -43,6 +32,8 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.FragmentAttributes;
@@ -64,6 +55,18 @@ import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordSchema;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
 
 @SideEffectFree
 @TriggerWhenEmpty
@@ -261,6 +264,34 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
         binManager.set(null);
     }
 
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>();
+
+        final Integer minRecords = validationContext.getProperty(MIN_RECORDS).asInteger();
+        final Integer maxRecords = validationContext.getProperty(MAX_RECORDS).asInteger();
+        if (minRecords != null && maxRecords != null && maxRecords < minRecords) {
+            results.add(new ValidationResult.Builder()
+                .subject("Max Records")
+                .input(String.valueOf(maxRecords))
+                .valid(false)
+                .explanation("<Maximum Number of Records> property cannot be smaller than <Minimum Number of Records> property")
+                .build());
+        }
+
+        final Double minSize = validationContext.getProperty(MIN_SIZE).asDataSize(DataUnit.B);
+        final Double maxSize = validationContext.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
+        if (minSize != null && maxSize != null && maxSize < minSize) {
+            results.add(new ValidationResult.Builder()
+                .subject("Max Size")
+                .input(validationContext.getProperty(MAX_SIZE).getValue())
+                .valid(false)
+                .explanation("<Maximum Bin Size> property cannot be smaller than <Minimum Bin Size> property")
+                .build());
+        }
+
+        return results;
+    }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
@@ -304,13 +335,24 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
             session.commit();
         }
 
+        // If there is no more data queued up, complete any bin that meets our minimum threshold
+        int completedBins = 0;
+        if (flowFiles.isEmpty()) {
+            try {
+                completedBins += manager.completeFullEnoughBins();
+            } catch (final Exception e) {
+                getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
+            }
+        }
+
+        // Complete any bins that have reached their expiration date
         try {
-            manager.completeExpiredBins();
+            completedBins += manager.completeExpiredBins();
         } catch (final Exception e) {
             getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
         }
 
-        if (flowFiles.isEmpty()) {
+        if (completedBins == 0 && flowFiles.isEmpty()) {
             getLogger().debug("No FlowFiles to bin; will yield");
             context.yield();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/2a964681/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index 6dc4247..23f5edf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -17,6 +17,19 @@
 
 package org.apache.nifi.processors.standard.merge;
 
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.standard.MergeRecord;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
@@ -32,21 +45,6 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.standard.MergeRecord;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
-
 public class RecordBin {
     public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
     public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
@@ -96,8 +94,7 @@ public class RecordBin {
         return complete;
     }
 
-    public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block)
-        throws IOException, MalformedRecordException, SchemaNotFoundException {
+    public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block) throws IOException {
 
         if (isComplete()) {
             logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});

http://git-wip-us.apache.org/repos/asf/nifi/blob/2a964681/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index 8496a4d..312832e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -17,19 +17,6 @@
 
 package org.apache.nifi.processors.standard.merge;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -43,6 +30,20 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
 public class RecordBinManager {
 
     private final ProcessContext context;
@@ -237,8 +238,16 @@ public class RecordBinManager {
     }
 
 
-    public void completeExpiredBins() throws IOException {
+    public int completeExpiredBins() throws IOException {
         final long maxNanos = maxBinAgeNanos.get();
+        return handleCompletedBins(bin -> bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS));
+    }
+
+    public int completeFullEnoughBins() throws IOException {
+        return handleCompletedBins(bin -> bin.isFullEnough());
+    }
+
+    private int handleCompletedBins(final Predicate<RecordBin> completionTest) throws IOException {
         final Map<String, List<RecordBin>> expiredBinMap = new HashMap<>();
 
         lock.lock();
@@ -248,7 +257,7 @@ public class RecordBinManager {
                 final List<RecordBin> bins = entry.getValue();
 
                 for (final RecordBin bin : bins) {
-                    if (bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS)) {
+                    if (completionTest.test(bin)) {
                         final List<RecordBin> expiredBinsForKey = expiredBinMap.computeIfAbsent(key, ignore -> new ArrayList<>());
                         expiredBinsForKey.add(bin);
                     }
@@ -258,6 +267,7 @@ public class RecordBinManager {
             lock.unlock();
         }
 
+        int completed = 0;
         for (final Map.Entry<String, List<RecordBin>> entry : expiredBinMap.entrySet()) {
             final String key = entry.getKey();
             final List<RecordBin> expiredBins = entry.getValue();
@@ -265,12 +275,16 @@ public class RecordBinManager {
             for (final RecordBin bin : expiredBins) {
                 logger.debug("Completing Bin {} because it has expired");
                 bin.complete("Bin has reached Max Bin Age");
+                completed++;
             }
 
             removeBins(key, expiredBins);
         }
+
+        return completed;
     }
 
+
     private void removeBins(final String key, final List<RecordBin> bins) {
         lock.lock();
         try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/2a964681/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index 261f981..4ba57af 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -17,12 +17,6 @@
 
 package org.apache.nifi.processors.standard;
 
-import static org.junit.Assert.assertEquals;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.CommaSeparatedRecordReader;
@@ -34,6 +28,12 @@ import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
 public class TestMergeRecord {
     private TestRunner runner;
     private CommaSeparatedRecordReader readerService;
@@ -202,11 +202,24 @@ public class TestMergeRecord {
     }
 
     @Test
-    public void testMinRecords() {
+    public void testValidation() {
         runner.setProperty(MergeRecord.MIN_RECORDS, "103");
         runner.setProperty(MergeRecord.MAX_RECORDS, "2");
         runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
 
+        runner.assertNotValid();
+
+        runner.setProperty(MergeRecord.MIN_RECORDS, "2");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "103");
+        runner.assertValid();
+    }
+
+    @Test
+    public void testMinRecords() {
+        runner.setProperty(MergeRecord.MIN_RECORDS, "103");
+        runner.setProperty(MergeRecord.MAX_RECORDS, "110");
+        runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
+
         runner.enqueue("Name, Age\nJohn, 35");
         runner.enqueue("Name, Age\nJane, 34");
 
@@ -221,7 +234,7 @@ public class TestMergeRecord {
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
 
         runner.enqueue("Name, Age\nJohn, 35");
-        runner.run();
+        runner.run(2);
         runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
     }
@@ -240,6 +253,8 @@ public class TestMergeRecord {
         runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 30);
 
         assertEquals(4, runner.getQueueSize().getObjectCount());
+
+        runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).stream().forEach(ff -> ff.assertAttributeEquals("record.count", "10"));
     }
 
     @Test