You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2017/05/24 17:36:25 UTC

nifi git commit: NIFI-3969: Prevent merging flowfiles prematurely when all bins fill but some are already full and can be processed

Repository: nifi
Updated Branches:
  refs/heads/master bb96b0f46 -> 08b66b5b6


NIFI-3969: Prevent merging flowfiles prematurely when all bins fill but some are already full and can be processed

Signed-off-by: Pierre Villard <pi...@gmail.com>

This closes #1850.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/08b66b5b
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/08b66b5b
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/08b66b5b

Branch: refs/heads/master
Commit: 08b66b5b6a2fd4194d55a98900c1d898b4e74a28
Parents: bb96b0f
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed May 24 08:27:06 2017 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Wed May 24 19:36:18 2017 +0200

----------------------------------------------------------------------
 .../apache/nifi/processor/util/bin/BinFiles.java  |  2 +-
 .../processors/standard/TestMergeContent.java     | 18 ++++++++++++++++++
 2 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/08b66b5b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
index 67e37c2..b15d23b 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-processor-utils/src/main/java/org/apache/nifi/processor/util/bin/BinFiles.java
@@ -205,7 +205,7 @@ public abstract class BinFiles extends AbstractSessionFactoryProcessor {
         // if we have created all of the bins that are allowed, go ahead and remove the oldest one. If we don't do
         // this, then we will simply wait for it to expire because we can't get any more FlowFiles into the
         // bins. So we may as well expire it now.
-        if (added == 0 && (readyBins.size() + binManager.getBinCount()) >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
+        if (added == 0 && binManager.getBinCount() >= context.getProperty(MAX_BIN_COUNT).asInteger()) {
             final Bin bin = binManager.removeOldestBin();
             if (bin != null) {
                 added++;

http://git-wip-us.apache.org/repos/asf/nifi/blob/08b66b5b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index b6025c5..b8f9210 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -911,6 +911,24 @@ public class TestMergeContent {
         bundle.assertAttributeExists(MergeContent.MERGE_BIN_AGE_ATTRIBUTE);
     }
 
+    @Test
+    public void testLeavesSmallBinUnmerged() {
+        final TestRunner runner = TestRunners.newTestRunner(new MergeContent());
+        runner.setProperty(MergeContent.MIN_ENTRIES, "5");
+        runner.setProperty(MergeContent.MAX_ENTRIES, "5");
+        runner.setProperty(MergeContent.MAX_BIN_COUNT, "3");
+
+        for (int i = 0; i < 17; i++) {
+            runner.enqueue(String.valueOf(i) + "\n");
+        }
+
+        runner.run(5);
+
+        runner.assertTransferCount(MergeContent.REL_MERGED, 3);
+        runner.assertTransferCount(MergeContent.REL_ORIGINAL, 15);
+        assertEquals(2, runner.getQueueSize().getObjectCount());
+    }
+
     private void createFlowFiles(final TestRunner testRunner) throws UnsupportedEncodingException {
         final Map<String, String> attributes = new HashMap<>();
         attributes.put(CoreAttributes.MIME_TYPE.key(), "application/plain-text");