You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/11/24 14:51:28 UTC

[GitHub] [nifi] markap14 commented on a change in pull request #5550: NIFI-9391: Modified MergeRecord to process FlowFiles within a loop in…

markap14 commented on a change in pull request #5550:
URL: https://github.com/apache/nifi/pull/5550#discussion_r756140343



##########
File path: nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
##########
@@ -221,28 +221,18 @@ public void run(final int iterations, final boolean stopOnFinish, final boolean
             } catch (final InterruptedException e1) {
             }
 
-            int finishedCount = 0;
-            boolean unscheduledRun = false;
             for (final Future<Throwable> future : futures) {
                 try {
                     final Throwable thrown = future.get(); // wait for the result
                     if (thrown != null) {
                         throw new AssertionError(thrown);
                     }
-
-                    if (++finishedCount == 1) {
-                        unscheduledRun = true;
-                        unSchedule();
-                    }
                 } catch (final Exception e) {
                 }
             }
 
-            if (!unscheduledRun) {
-                unSchedule();
-            }
-
             if (stopOnFinish) {
+                unSchedule();

Review comment:
       I'm not sure that I agree with this change. It is much simpler, but it changes the semantics a bit of how this runs. If the TestRunner is configured to use more than one thread and use multiple iterations, the old way would call `@OnUnscheduled` methods after the first thread completes, while other threads are potentially still running. This change makes `@OnUnscheduled` and `@OnStopped` methods functionally the same in the testing framework, while they are very different within NiFi. It's a fairly common mistake to see developers use `@OnUnscheduled` when they should use `@OnStopped`. So the code as-is would potentially highlight such issues in unit tests, while the change prevents it from catching this mistakes. I do see the need to check the `stopOnFinish` flag but perhaps it makes more sense to just update the `if` conditional to `if (++finishedCount == 1 && stopOnFinish) {`?

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
+            }
 
-        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }
 
-        try {
-            for (final FlowFile flowFile : flowFiles) {
-                try {
-                    binFlowFile(context, flowFile, session, manager, block);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
-                    session.transfer(flowFile, REL_FAILURE);
+            try {
+                for (final FlowFile flowFile : flowFiles) {
+                    try {
+                        binFlowFile(context, flowFile, session, manager, block);
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to bin {} due to {}", flowFile, e);
+                        session.transfer(flowFile, REL_FAILURE);
+                    }
                 }
+            } finally {
+                session.commitAsync();
             }
-        } finally {
-            session.commitAsync();
-        }
 
-        // If there is no more data queued up, or strategy is defragment, complete any bin that meets our minimum threshold
-        // Otherwise, run one more cycle to process queued FlowFiles to add more fragment into available bins.
-        int completedBins = 0;
-        if (flowFiles.isEmpty() || MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+            // Complete any bins that have reached their expiration date
             try {
-                completedBins += manager.completeFullEnoughBins();
+                manager.completeExpiredBins();
             } catch (final Exception e) {
-                getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);
             }
         }
 
-        // Complete any bins that have reached their expiration date
-        try {
-            completedBins += manager.completeExpiredBins();
-        } catch (final Exception e) {
-            getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
-        }
+        if (isScheduled()) {
+            // Complete any bins that have reached their expiration date
+            try {
+                manager.completeExpiredBins();
+            } catch (final Exception e) {
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);

Review comment:
       Same comment about args to the logger. Need to pass a separate arg to match the `{}` matcher and another for the stacktrace.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
+            }
 
-        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }
 
-        try {
-            for (final FlowFile flowFile : flowFiles) {
-                try {
-                    binFlowFile(context, flowFile, session, manager, block);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
-                    session.transfer(flowFile, REL_FAILURE);
+            try {
+                for (final FlowFile flowFile : flowFiles) {
+                    try {
+                        binFlowFile(context, flowFile, session, manager, block);
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to bin {} due to {}", flowFile, e);
+                        session.transfer(flowFile, REL_FAILURE);
+                    }
                 }
+            } finally {
+                session.commitAsync();
             }
-        } finally {
-            session.commitAsync();
-        }
 
-        // If there is no more data queued up, or strategy is defragment, complete any bin that meets our minimum threshold
-        // Otherwise, run one more cycle to process queued FlowFiles to add more fragment into available bins.
-        int completedBins = 0;
-        if (flowFiles.isEmpty() || MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+            // Complete any bins that have reached their expiration date
             try {
-                completedBins += manager.completeFullEnoughBins();
+                manager.completeExpiredBins();
             } catch (final Exception e) {
-                getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);

Review comment:
       I don't think this change is correct. It'll result in a log message that says `Failed to merge FlowFiles to create new bin due to {}`. Need to either use the `... due to " + e` or `... due to {}", e, e);` so that the first argument matches to the `{}` while the second argument is the `Throwable` argument that causes a stack trace.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
+            }
 
-        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }
 
-        try {
-            for (final FlowFile flowFile : flowFiles) {
-                try {
-                    binFlowFile(context, flowFile, session, manager, block);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
-                    session.transfer(flowFile, REL_FAILURE);
+            try {
+                for (final FlowFile flowFile : flowFiles) {
+                    try {
+                        binFlowFile(context, flowFile, session, manager, block);
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to bin {} due to {}", flowFile, e);
+                        session.transfer(flowFile, REL_FAILURE);
+                    }
                 }
+            } finally {
+                session.commitAsync();
             }
-        } finally {
-            session.commitAsync();
-        }
 
-        // If there is no more data queued up, or strategy is defragment, complete any bin that meets our minimum threshold
-        // Otherwise, run one more cycle to process queued FlowFiles to add more fragment into available bins.
-        int completedBins = 0;
-        if (flowFiles.isEmpty() || MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+            // Complete any bins that have reached their expiration date
             try {
-                completedBins += manager.completeFullEnoughBins();
+                manager.completeExpiredBins();
             } catch (final Exception e) {
-                getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);
             }
         }
 
-        // Complete any bins that have reached their expiration date
-        try {
-            completedBins += manager.completeExpiredBins();
-        } catch (final Exception e) {
-            getLogger().error("Failed to merge FlowFiles to create new bin due to " + e, e);
-        }
+        if (isScheduled()) {
+            // Complete any bins that have reached their expiration date
+            try {
+                manager.completeExpiredBins();
+            } catch (final Exception e) {
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);
+            }
+
+            // Complete any bins that meet their minimum size requirements
+            try {
+                manager.completeFullEnoughBins();
+            } catch (final Exception e) {
+                getLogger().error("Failed to merge FlowFiles to create new bin due to {}", e);

Review comment:
       Same comment about args to the logger. Need to pass a separate arg to match the `{}` matcher and another for the stacktrace.

##########
File path: nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final ProcessSessionFactory
             }
         }
 
-        final ProcessSession session = sessionFactory.createSession();
-        final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
-        if (getLogger().isDebugEnabled()) {
-            final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
-            getLogger().debug("Pulled {} FlowFiles from queue: {}", new Object[] {ids.size(), ids});
-        }
+        while (isScheduled()) {
+            final ProcessSession session = sessionFactory.createSession();
+            final List<FlowFile> flowFiles = session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+            if (flowFiles.isEmpty()) {
+                break;
+            }
+            if (getLogger().isDebugEnabled()) {
+                final List<String> ids = flowFiles.stream().map(ff -> "id=" + ff.getId()).collect(Collectors.toList());
+                getLogger().debug("Pulled {} FlowFiles from queue: {}", ids.size(), ids);
+            }
 
-        final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
-        final boolean block;
-        if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
-            block = true;
-        } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
-            block = true;
-        } else {
-            block = false;
-        }
+            final String mergeStrategy = context.getProperty(MERGE_STRATEGY).getValue();
+            final boolean block;
+            if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+                block = true;
+            } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+                block = true;
+            } else {
+                block = false;
+            }
 
-        try {
-            for (final FlowFile flowFile : flowFiles) {
-                try {
-                    binFlowFile(context, flowFile, session, manager, block);
-                } catch (final Exception e) {
-                    getLogger().error("Failed to bin {} due to {}", new Object[] {flowFile, e});
-                    session.transfer(flowFile, REL_FAILURE);
+            try {
+                for (final FlowFile flowFile : flowFiles) {
+                    try {
+                        binFlowFile(context, flowFile, session, manager, block);
+                    } catch (final Exception e) {
+                        getLogger().error("Failed to bin {} due to {}", flowFile, e);

Review comment:
       Need to pass a separate arg to match the `{}` matcher and another for the stacktrace.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org