You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2018/09/24 20:36:15 UTC
nifi git commit: NIFI-5514: Fixed bugs in MergeRecord around minimum
thresholds not being honored and validation not being performed to ensure
that minimum threshold is smaller than max threshold (would previously allow
min record = 100, max records = 2
Repository: nifi
Updated Branches:
refs/heads/master f5e9ea680 -> 2a964681e
NIFI-5514: Fixed bugs in MergeRecord around minimum thresholds not being honored and validation not being performed to ensure that minimum threshold is smaller than max threshold (would previously allow min record = 100, max records = 2 as a valid configuration)
NIFI-5514: Do not rely on ProcessSession.getQueueSize() to return a queue size of 0 objects because if the processor is holding onto data, the queue size won't be 0.
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #2954.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/2a964681
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/2a964681
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/2a964681
Branch: refs/heads/master
Commit: 2a964681eca443cc335b6f269d2b9ddab57250c7
Parents: f5e9ea6
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Aug 16 14:08:43 2018 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Mon Sep 24 22:36:02 2018 +0200
----------------------------------------------------------------------
.../nifi/processors/standard/MergeRecord.java | 68 ++++++++++++++++----
.../processors/standard/merge/RecordBin.java | 31 ++++-----
.../standard/merge/RecordBinManager.java | 44 ++++++++-----
.../processors/standard/TestMergeRecord.java | 31 ++++++---
4 files changed, 121 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/2a964681/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
index 56227f2..ecebb8e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
@@ -17,17 +17,6 @@
package org.apache.nifi.processors.standard;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
@@ -43,6 +32,8 @@ import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.FragmentAttributes;
@@ -64,6 +55,18 @@ import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
@SideEffectFree
@TriggerWhenEmpty
@@ -261,6 +264,34 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
binManager.set(null);
}
+ @Override
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>();
+
+ final Integer minRecords = validationContext.getProperty(MIN_RECORDS).asInteger();
+ final Integer maxRecords = validationContext.getProperty(MAX_RECORDS).asInteger();
+ if (minRecords != null && maxRecords != null && maxRecords < minRecords) {
+ results.add(new ValidationResult.Builder()
+ .subject("Max Records")
+ .input(String.valueOf(maxRecords))
+ .valid(false)
+ .explanation("<Maximum Number of Records> property cannot be smaller than <Minimum Number of Records> property")
+ .build());
+ }
+
+ final Double minSize = validationContext.getProperty(MIN_SIZE).asDataSize(DataUnit.B);
+ final Double maxSize = validationContext.getProperty(MAX_SIZE).asDataSize(DataUnit.B);
+ if (minSize != null && maxSize != null && maxSize < minSize) {
+ results.add(new ValidationResult.Builder()
+ .subject("Max Size")
+ .input(validationContext.getProperty(MAX_SIZE).getValue())
+ .valid(false)
+ .explanation("<Maximum Bin Size> property cannot be smaller than <Minimum Bin Size> property")
+ .build());
+ }
+
+ return results;
+ }
@Override
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
@@ -304,13 +335,24 @@ public class MergeRecord extends AbstractSessionFactoryProcessor {
session.commit();
}
+ // If there is no more data queued up, complete any bin that meets our minimum threshold
+ int completedBins = 0;
+ if (flowFiles.isEmpty()) {
+ try {
+ completedBins += manager.completeFullEnoughBins();
+ } catch (final Exception e) {
+ getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
+ }
+ }
+
+ // Complete any bins that have reached their expiration date
try {
- manager.completeExpiredBins();
+ completedBins += manager.completeExpiredBins();
} catch (final Exception e) {
getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
}
- if (flowFiles.isEmpty()) {
+ if (completedBins == 0 && flowFiles.isEmpty()) {
getLogger().debug("No FlowFiles to bin; will yield");
context.yield();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/2a964681/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
index 6dc4247..23f5edf 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java
@@ -17,6 +17,19 @@
package org.apache.nifi.processors.standard.merge;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processors.standard.MergeRecord;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
@@ -32,21 +45,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.flowfile.attributes.CoreAttributes;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.standard.MergeRecord;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordSetWriter;
-import org.apache.nifi.serialization.RecordSetWriterFactory;
-import org.apache.nifi.serialization.WriteResult;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.stream.io.ByteCountingOutputStream;
-
public class RecordBin {
public static final String MERGE_COUNT_ATTRIBUTE = "merge.count";
public static final String MERGE_BIN_AGE_ATTRIBUTE = "merge.bin.age";
@@ -96,8 +94,7 @@ public class RecordBin {
return complete;
}
- public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block)
- throws IOException, MalformedRecordException, SchemaNotFoundException {
+ public boolean offer(final FlowFile flowFile, final RecordReader recordReader, final ProcessSession flowFileSession, final boolean block) throws IOException {
if (isComplete()) {
logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this});
http://git-wip-us.apache.org/repos/asf/nifi/blob/2a964681/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
index 8496a4d..312832e 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java
@@ -17,19 +17,6 @@
package org.apache.nifi.processors.standard.merge;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
-
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@@ -43,6 +30,20 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Predicate;
+
public class RecordBinManager {
private final ProcessContext context;
@@ -237,8 +238,16 @@ public class RecordBinManager {
}
- public void completeExpiredBins() throws IOException {
+ public int completeExpiredBins() throws IOException {
final long maxNanos = maxBinAgeNanos.get();
+ return handleCompletedBins(bin -> bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS));
+ }
+
+ public int completeFullEnoughBins() throws IOException {
+ return handleCompletedBins(bin -> bin.isFullEnough());
+ }
+
+ private int handleCompletedBins(final Predicate<RecordBin> completionTest) throws IOException {
final Map<String, List<RecordBin>> expiredBinMap = new HashMap<>();
lock.lock();
@@ -248,7 +257,7 @@ public class RecordBinManager {
final List<RecordBin> bins = entry.getValue();
for (final RecordBin bin : bins) {
- if (bin.isOlderThan(maxNanos, TimeUnit.NANOSECONDS)) {
+ if (completionTest.test(bin)) {
final List<RecordBin> expiredBinsForKey = expiredBinMap.computeIfAbsent(key, ignore -> new ArrayList<>());
expiredBinsForKey.add(bin);
}
@@ -258,6 +267,7 @@ public class RecordBinManager {
lock.unlock();
}
+ int completed = 0;
for (final Map.Entry<String, List<RecordBin>> entry : expiredBinMap.entrySet()) {
final String key = entry.getKey();
final List<RecordBin> expiredBins = entry.getValue();
@@ -265,12 +275,16 @@ public class RecordBinManager {
for (final RecordBin bin : expiredBins) {
logger.debug("Completing Bin {} because it has expired");
bin.complete("Bin has reached Max Bin Age");
+ completed++;
}
removeBins(key, expiredBins);
}
+
+ return completed;
}
+
private void removeBins(final String key, final List<RecordBin> bins) {
lock.lock();
try {
http://git-wip-us.apache.org/repos/asf/nifi/blob/2a964681/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
index 261f981..4ba57af 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeRecord.java
@@ -17,12 +17,6 @@
package org.apache.nifi.processors.standard;
-import static org.junit.Assert.assertEquals;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.record.CommaSeparatedRecordReader;
@@ -34,6 +28,12 @@ import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
public class TestMergeRecord {
private TestRunner runner;
private CommaSeparatedRecordReader readerService;
@@ -202,11 +202,24 @@ public class TestMergeRecord {
}
@Test
- public void testMinRecords() {
+ public void testValidation() {
runner.setProperty(MergeRecord.MIN_RECORDS, "103");
runner.setProperty(MergeRecord.MAX_RECORDS, "2");
runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
+ runner.assertNotValid();
+
+ runner.setProperty(MergeRecord.MIN_RECORDS, "2");
+ runner.setProperty(MergeRecord.MAX_RECORDS, "103");
+ runner.assertValid();
+ }
+
+ @Test
+ public void testMinRecords() {
+ runner.setProperty(MergeRecord.MIN_RECORDS, "103");
+ runner.setProperty(MergeRecord.MAX_RECORDS, "110");
+ runner.setProperty(MergeRecord.MIN_SIZE, "500 B");
+
runner.enqueue("Name, Age\nJohn, 35");
runner.enqueue("Name, Age\nJane, 34");
@@ -221,7 +234,7 @@ public class TestMergeRecord {
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 0);
runner.enqueue("Name, Age\nJohn, 35");
- runner.run();
+ runner.run(2);
runner.assertTransferCount(MergeRecord.REL_MERGED, 1);
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 4);
}
@@ -240,6 +253,8 @@ public class TestMergeRecord {
runner.assertTransferCount(MergeRecord.REL_ORIGINAL, 30);
assertEquals(4, runner.getQueueSize().getObjectCount());
+
+ runner.getFlowFilesForRelationship(MergeRecord.REL_MERGED).stream().forEach(ff -> ff.assertAttributeEquals("record.count", "10"));
}
@Test