You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2019/08/29 14:35:33 UTC

[nifi] branch master updated: NIFI-6025: Include Processor 'scheduled state' (i.e., Enabled or Disabled) in the VersionedProcessor when pushing to Flow Registry and take into account when updating flows on the NiFi side

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ae1fec  NIFI-6025: Include Processor 'scheduled state' (i.e., Enabled or Disabled) in the VersionedProcessor when pushing to Flow Registry and take into account when updating flows on the NiFi side
4ae1fec is described below

commit 4ae1fec78a77be5b9b06bd79fa22ae020f3365bc
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Jun 24 11:41:40 2019 -0400

    NIFI-6025: Include Processor 'scheduled state' (i.e., Enabled or Disabled) in the VersionedProcessor when pushing to Flow Registry and take into account when updating flows on the NiFi side
    
    NIFI-6025: Include difference in Scheduled State as a Local Flow Difference
    
    This closes #3546.
    
    Signed-off-by: Bryan Bende <bb...@apache.org>
---
 .../main/java/org/apache/nifi/groups/StandardProcessGroup.java | 10 ++++++++++
 .../nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java     |  3 +++
 .../main/java/org/apache/nifi/web/api/VersionsResource.java    |  8 ++++----
 3 files changed, 17 insertions(+), 4 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index b3e2a7c..858d826 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -458,6 +458,8 @@ public final class StandardProcessGroup implements ProcessGroup {
         } finally {
             readLock.unlock();
         }
+
+        onComponentModified();
     }
 
     @Override
@@ -477,6 +479,8 @@ public final class StandardProcessGroup implements ProcessGroup {
         } finally {
             readLock.unlock();
         }
+
+        onComponentModified();
     }
 
     private StateManager getStateManager(final String componentId) {
@@ -4544,6 +4548,12 @@ public final class StandardProcessGroup implements ProcessGroup {
             processor.setYieldPeriod(proposed.getYieldDuration());
             processor.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
 
+            if (proposed.getScheduledState() == org.apache.nifi.registry.flow.ScheduledState.DISABLED) {
+                disableProcessor(processor);
+            } else if (processor.getScheduledState() == ScheduledState.DISABLED) {
+                enableProcessor(processor);
+            }
+
             if (!isEqual(processor.getBundleCoordinate(), proposed.getBundle())) {
                 final BundleCoordinate newBundleCoordinate = toCoordinate(proposed.getBundle());
                 final List<PropertyDescriptor> descriptors = new ArrayList<>(processor.getProperties().keySet());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
index fdfc143..67be279 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
@@ -27,6 +27,7 @@ import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.ComponentNode;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.PropertyConfiguration;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -544,6 +545,8 @@ public class NiFiRegistryFlowMapper {
         processor.setSchedulingStrategy(procNode.getSchedulingStrategy().name());
         processor.setStyle(procNode.getStyle());
         processor.setYieldDuration(procNode.getYieldPeriod());
+        processor.setScheduledState(procNode.getScheduledState() == ScheduledState.DISABLED ? org.apache.nifi.registry.flow.ScheduledState.DISABLED
+            : org.apache.nifi.registry.flow.ScheduledState.ENABLED);
 
         return processor;
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
index 84744e2..8aec9c9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/VersionsResource.java
@@ -1214,7 +1214,7 @@ public class VersionsResource extends ApplicationResource {
                     } catch (final ResumeFlowException rfe) {
                         // Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow
                         // since in this case the flow was successfully updated - we just couldn't re-enable the components.
-                        logger.error(rfe.getMessage(), rfe);
+                        logger.warn(rfe.getMessage(), rfe);
                         vcur.fail(rfe.getMessage());
                     } catch (final Exception e) {
                         logger.error("Failed to update flow to new version", e);
@@ -1415,7 +1415,7 @@ public class VersionsResource extends ApplicationResource {
                     } catch (final ResumeFlowException rfe) {
                         // Treat ResumeFlowException differently because we don't want to include a message that we couldn't update the flow
                         // since in this case the flow was successfully updated - we just couldn't re-enable the components.
-                        logger.error(rfe.getMessage(), rfe);
+                        logger.warn(rfe.getMessage(), rfe);
                         vcur.fail(rfe.getMessage());
                     } catch (final Exception e) {
                         logger.error("Failed to update flow to new version", e);
@@ -1586,7 +1586,7 @@ public class VersionsResource extends ApplicationResource {
                 } catch (final IllegalStateException ise) {
                     // Component Lifecycle will re-enable the Controller Services only if they are valid. If IllegalStateException gets thrown, we need to provide
                     // a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
-                    throw new ResumeFlowException("Failed to re-enable Controller Services because " + ise.getMessage(), ise);
+                    throw new ResumeFlowException("Successfully updated flow but could not re-enable all Controller Services because " + ise.getMessage(), ise);
                 }
             }
 
@@ -1638,7 +1638,7 @@ public class VersionsResource extends ApplicationResource {
                 } catch (final IllegalStateException ise) {
                     // Component Lifecycle will restart the Processors only if they are valid. If IllegalStateException gets thrown, we need to provide
                     // a more intelligent error message as to exactly what happened, rather than indicate that the flow could not be updated.
-                    throw new ResumeFlowException("Failed to restart components because " + ise.getMessage(), ise);
+                    throw new ResumeFlowException("Successfully updated flow but could not restart all Processors because " + ise.getMessage(), ise);
                 }
             }
         }