You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2017/05/24 17:36:25 UTC
nifi git commit: NIFI-3969: Prevent merging flowfiles prematurely
when all bins fill but some are already full and can be processed
Repository: nifi
Updated Branches:
refs/heads/master bb96b0f46 -> 08b66b5b6
NIFI-3969: Prevent merging flowfiles prematurely when all bins fill but some are already full and can be processed
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #1850.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/08b66b5b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/08b66b5b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/08b66b5b
Branch: refs/heads/master
Commit: 08b66b5b6a2fd4194d55a98900c1d898b4e74a28
Parents: bb96b0f
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed May 24 08:27:06 2017 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed May 24 19:36:18 2017 +0200
----------------------------------------------------------------------
.../apache/nifi/processor/util/bin/BinFiles.java | 2 +-
.../processors/standard/TestMergeContent.java | 18 ++++++++++++++++++
2 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/08b66b5b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index 67e37c2..b15d23b 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -205,7 +205,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
// if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
// this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
// bins. So we may as well expire it now.
- if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
+ if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
final Bin bin = binManager.removeOldestBin();
if (bin != null) {
added++;
http://git-wip-us.apache.org/repos/asf/nifi/blob/08b66b5b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index b6025c5..b8f9210 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -911,6 +911,24 @@ public class TestMergeContent {
bundle.assertAttributeExists(MergeContent.MERGE_BIN_AGE_ATTRIBUTE);
}
+ @Test
+ public void testLeavesSmallBinUnmerged() {
+ final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
+ runner.setProperty(MergeContent.MIN_ENTRIES, "5");
+ runner.setProperty(MergeContent.MAX_ENTRIES, "5");
+ runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
+
+ for (int i = 0; i < 17; i++) {
+ runner.enqueue(String.valueOf(i) + "\n");
+ }
+
+ runner.run(5);
+
+ runner.assertTransferCount(MergeContent.REL_MERGED, 3);
+ runner.assertTransferCount(MergeContent.REL_ORIGINAL, 15);
+ assertEquals(2, runner.getQueueSize().getObjectCount());
+ }
+
private void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException {
final Map<String, String> attributes = new HashMap<>();
attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");