You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/11/23 22:34:55 UTC

[GitHub] [nifi] turcsanyip opened a new pull request #5550: NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in…

turcsanyip opened a new pull request #5550:
URL: https://github.com/apache/nifi/pull/5550


   … a single onTrigger
   
   MergeRecord processed the FlowFiles in multiple onTrigger-s and it needed an extra onTrigger call
   (with no incoming FFs) to realize that no more FFs are available and it is time to send the merged FF downstream.
   It was not compatible with Stateless Runtime which does not trigger the flow any more if no FFs available.
   
   Also changed "unschedule" logic in StandardProcessorTestRunner: @OnUnscheduled methods were called immediately after
   the 1st FlowFile was processed. Unschedule the processor only at the end of the execution (onTrigger finished)
   and only if stopOnFinish has been requested by the test case.
   
   https://issues.apache.org/jira/browse/NIFI-9391
   
   In order to streamline the review of the contribution we ask you
   to ensure the following steps have been taken:
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)?
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #5550: NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in…

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #5550:
URL: https://github.com/apache/nifi/pull/5550#discussion_r757946332



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
+            }
 
-        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }

Review comment:
       ```suggestion
               block = context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #5550: NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in…

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #5550:
URL: https://github.com/apache/nifi/pull/5550#discussion_r762804091



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
+            }
 
-        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }
 
-        try {
-            for (final FlowFile flowFile : flowFiles) {
-                try {
-                    binFlowFile(context, flowFile, session, manager, block);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
-                    session.transfer(flowFile, REL_FAILURE);
+            try {
+                for (final FlowFile flowFile : flowFiles) {
+                    try {
+                        binFlowFile(context, flowFile, session, manager, block);
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to bin {} due to {}", flowFile, e);
+                        session.transfer(flowFile, REL_FAILURE);
+                    }
                 }
+            } finally {
+                session.commitAsync();
             }
-        } finally {
-            session.commitAsync();
-        }
 
-        // If there is no more data queued up, or strategy is defragment, complete any bin that meets our minimum threshold
-        // Otherwise, run one more cycle to process queued FlowFiles to add more fragment into available bins.
-        int completedBins = 0;
-        if (flowFiles.isEmpty() || MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+            // Complete any bins that have reached their expiration date
             try {
-                completedBins += manager.completeFullEnoughBins();
+                manager.completeExpiredBins();
             } catch (final Exception e) {
-                getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);

Review comment:
       @markap14 Thanks for catching it. Modified the log statements.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #5550: NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in…

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #5550:
URL: https://github.com/apache/nifi/pull/5550#discussion_r762803566



##########
File path: nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
##########
@@ -221,28 +221,18 @@ public void run(final int iterations, final boolean stopOnFinish, final boolean
             } catch (final InterruptedException e1) {
             }
 
-            int finishedCount = 0;
-            boolean unscheduledRun = false;
             for (final Future<Throwable> future : futures) {
                 try {
                     final Throwable thrown = future.get(); // wait for the result
                     if (thrown != null) {
                         throw new AssertionError(thrown);
                     }
-
-                    if (++finishedCount == 1) {
-                        unscheduledRun = true;
-                        unSchedule();
-                    }
                 } catch (final Exception e) {
                 }
             }
 
-            if (!unscheduledRun) {
-                unSchedule();
-            }
-
             if (stopOnFinish) {
+                unSchedule();

Review comment:
       @markap14 I reveretd the change and added `stopOnFinish` checks to existing `if` statements.
   Could you please check this version?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on pull request #5550: NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in…

Posted by GitBox <gi...@apache.org>.
markap14 commented on pull request #5550:
URL: https://github.com/apache/nifi/pull/5550#issuecomment-1004965060


   Thanks for the fix & the updates @turcsanyip! All looks good to me. +1 will merge to main


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 merged pull request #5550: NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in…

Posted by GitBox <gi...@apache.org>.
markap14 merged pull request #5550:
URL: https://github.com/apache/nifi/pull/5550


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #5550: NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in…

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #5550:
URL: https://github.com/apache/nifi/pull/5550#discussion_r756140343



##########
File path: nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
##########
@@ -221,28 +221,18 @@ public void run(final int iterations, final boolean stopOnFinish, final boolean
             } catch (final InterruptedException e1) {
             }
 
-            int finishedCount = 0;
-            boolean unscheduledRun = false;
             for (final Future<Throwable> future : futures) {
                 try {
                     final Throwable thrown = future.get(); // wait for the result
                     if (thrown != null) {
                         throw new AssertionError(thrown);
                     }
-
-                    if (++finishedCount == 1) {
-                        unscheduledRun = true;
-                        unSchedule();
-                    }
                 } catch (final Exception e) {
                 }
             }
 
-            if (!unscheduledRun) {
-                unSchedule();
-            }
-
             if (stopOnFinish) {
+                unSchedule();

Review comment:
       I'm not sure that I agree with this change. It is much simpler, but it changes the semantics a bit of how this runs. If the TestRunner is configured to use more than one thread and use multiple iterations, the old way would call `@OnUnscheduled` methods after the first thread completes, while other threads are potentially still running. This change makes `@OnUnscheduled` and `@OnStopped` methods functionally the same in the testing framework, while they are very different within NiFi. It's a fairly common mistake to see developers use `@OnUnscheduled` when they should use `@OnStopped`. So the code as-is would potentially highlight such issues in unit tests, while the change prevents it from catching this mistakes. I do see the need to check the `stopOnFinish` flag but perhaps it makes more sense to just update the `if` conditional to `if (++finishedCount == 1 && stopOnFinish) {`?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
+            }
 
-        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }
 
-        try {
-            for (final FlowFile flowFile : flowFiles) {
-                try {
-                    binFlowFile(context, flowFile, session, manager, block);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
-                    session.transfer(flowFile, REL_FAILURE);
+            try {
+                for (final FlowFile flowFile : flowFiles) {
+                    try {
+                        binFlowFile(context, flowFile, session, manager, block);
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to bin {} due to {}", flowFile, e);
+                        session.transfer(flowFile, REL_FAILURE);
+                    }
                 }
+            } finally {
+                session.commitAsync();
             }
-        } finally {
-            session.commitAsync();
-        }
 
-        // If there is no more data queued up, or strategy is defragment, complete any bin that meets our minimum threshold
-        // Otherwise, run one more cycle to process queued FlowFiles to add more fragment into available bins.
-        int completedBins = 0;
-        if (flowFiles.isEmpty() || MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+            // Complete any bins that have reached their expiration date
             try {
-                completedBins += manager.completeFullEnoughBins();
+                manager.completeExpiredBins();
             } catch (final Exception e) {
-                getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);
             }
         }
 
-        // Complete any bins that have reached their expiration date
-        try {
-            completedBins += manager.completeExpiredBins();
-        } catch (final Exception e) {
-            getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
-        }
+        if (isScheduled()) {
+            // Complete any bins that have reached their expiration date
+            try {
+                manager.completeExpiredBins();
+            } catch (final Exception e) {
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);

Review comment:
       Same comment about args to the logger. Need to pass a separate arg to match the `{}` matcher and another for the stacktrace.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
+            }
 
-        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }
 
-        try {
-            for (final FlowFile flowFile : flowFiles) {
-                try {
-                    binFlowFile(context, flowFile, session, manager, block);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
-                    session.transfer(flowFile, REL_FAILURE);
+            try {
+                for (final FlowFile flowFile : flowFiles) {
+                    try {
+                        binFlowFile(context, flowFile, session, manager, block);
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to bin {} due to {}", flowFile, e);
+                        session.transfer(flowFile, REL_FAILURE);
+                    }
                 }
+            } finally {
+                session.commitAsync();
             }
-        } finally {
-            session.commitAsync();
-        }
 
-        // If there is no more data queued up, or strategy is defragment, complete any bin that meets our minimum threshold
-        // Otherwise, run one more cycle to process queued FlowFiles to add more fragment into available bins.
-        int completedBins = 0;
-        if (flowFiles.isEmpty() || MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+            // Complete any bins that have reached their expiration date
             try {
-                completedBins += manager.completeFullEnoughBins();
+                manager.completeExpiredBins();
             } catch (final Exception e) {
-                getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);

Review comment:
       I don't think this change is correct. It'll result in a log message that says `Failed to merge FlowFiles to create new bin due to {}`. Need to either use the `... due to " + e` or `... due to {}", e, e);` so that the first argument matches to the `{}` while the second argument is the `Throwable` argument that causes a stack trace.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
+            }
 
-        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }
 
-        try {
-            for (final FlowFile flowFile : flowFiles) {
-                try {
-                    binFlowFile(context, flowFile, session, manager, block);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
-                    session.transfer(flowFile, REL_FAILURE);
+            try {
+                for (final FlowFile flowFile : flowFiles) {
+                    try {
+                        binFlowFile(context, flowFile, session, manager, block);
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to bin {} due to {}", flowFile, e);
+                        session.transfer(flowFile, REL_FAILURE);
+                    }
                 }
+            } finally {
+                session.commitAsync();
             }
-        } finally {
-            session.commitAsync();
-        }
 
-        // If there is no more data queued up, or strategy is defragment, complete any bin that meets our minimum threshold
-        // Otherwise, run one more cycle to process queued FlowFiles to add more fragment into available bins.
-        int completedBins = 0;
-        if (flowFiles.isEmpty() || MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+            // Complete any bins that have reached their expiration date
             try {
-                completedBins += manager.completeFullEnoughBins();
+                manager.completeExpiredBins();
             } catch (final Exception e) {
-                getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);
             }
         }
 
-        // Complete any bins that have reached their expiration date
-        try {
-            completedBins += manager.completeExpiredBins();
-        } catch (final Exception e) {
-            getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
-        }
+        if (isScheduled()) {
+            // Complete any bins that have reached their expiration date
+            try {
+                manager.completeExpiredBins();
+            } catch (final Exception e) {
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);
+            }
+
+            // Complete any bins that meet their minimum size requirements
+            try {
+                manager.completeFullEnoughBins();
+            } catch (final Exception e) {
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);

Review comment:
       Same comment about args to the logger. Need to pass a separate arg to match the `{}` matcher and another for the stacktrace.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
+            }
 
-        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }
 
-        try {
-            for (final FlowFile flowFile : flowFiles) {
-                try {
-                    binFlowFile(context, flowFile, session, manager, block);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
-                    session.transfer(flowFile, REL_FAILURE);
+            try {
+                for (final FlowFile flowFile : flowFiles) {
+                    try {
+                        binFlowFile(context, flowFile, session, manager, block);
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to bin {} due to {}", flowFile, e);

Review comment:
       Need to pass a separate arg to match the `{}` matcher and another for the stacktrace.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] Lehel44 commented on a change in pull request #5550: NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in…

Posted by GitBox <gi...@apache.org>.
Lehel44 commented on a change in pull request #5550:
URL: https://github.com/apache/nifi/pull/5550#discussion_r757946332



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
+            }
 
-        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }

Review comment:
       ```suggestion
               } else block = context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet();
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #5550: NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in…

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #5550:
URL: https://github.com/apache/nifi/pull/5550#discussion_r756298420



##########
File path: nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
##########
@@ -221,28 +221,18 @@ public void run(final int iterations, final boolean stopOnFinish, final boolean
             } catch (final InterruptedException e1) {
             }
 
-            int finishedCount = 0;
-            boolean unscheduledRun = false;
             for (final Future<Throwable> future : futures) {
                 try {
                     final Throwable thrown = future.get(); // wait for the result
                     if (thrown != null) {
                         throw new AssertionError(thrown);
                     }
-
-                    if (++finishedCount == 1) {
-                        unscheduledRun = true;
-                        unSchedule();
-                    }
                 } catch (final Exception e) {
                 }
             }
 
-            if (!unscheduledRun) {
-                unSchedule();
-            }
-
             if (stopOnFinish) {
+                unSchedule();

Review comment:
       @markap14 Thanks for your feedback and highlighting some aspects of how the test framework runs the processors and what the motivations are behind that. I totally agree that using `@OnUnscheduled` and `@OnStopped` in the proper way is a big issue that needs to be tested somehow. It was just weird for me (and broke my tests) that the second run of the processor happened in unscheduled state.
   My test case:
   - enqueue some FFs
   - run (trigger) the processor once
   - enqueue some more FFs
   - run (trigger) the processor one more
   
   I expected the processor to keep being scheduled between and during the 2 runs (as normally happens when I don't want to stop it).
   
   I tried your suggestion about using `if (++finishedCount == 1 && stopOnFinish) {`. It works if I also add stopOnFinish check to `if (!unscheduledRun && stopOnFinish) {` (line 241 in the original version).
   Actually, it falls back to my
   ```
               if (stopOnFinish) {
                   unSchedule();
                   stop();
               }
   ```
   solution if run(1) was called.
   
   Would that be fine with you?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] turcsanyip commented on a change in pull request #5550: NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in…

Posted by GitBox <gi...@apache.org>.
turcsanyip commented on a change in pull request #5550:
URL: https://github.com/apache/nifi/pull/5550#discussion_r762805421



##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
+            }
 
-        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }

Review comment:
       @Lehel44 This code block has not changed now (except indentation) so I would not modify it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org