You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2021/12/14 17:15:28 UTC

[nifi] 10/15: NIFI-9393 Set Port Scheduled State for Flow Definitions

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch support/nifi-1.15
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 3955b88302b5f60e104c12b656fed64b5169b8b1
Author: exceptionfactory <ex...@apache.org>
AuthorDate: Thu Nov 18 14:04:17 2021 -0600

    NIFI-9393 Set Port Scheduled State for Flow Definitions
    
    - Set Scheduled State for Versioned Port and Versioned Remote Port when mapping Flow Definition
    - Updated StandardProcessGroup to set disable Port based on Scheduled State of DISABLED
    - Updated StandardProcessGroup to set Remote Port transmitting based on Scheduled State of ENABLED
    
    Signed-off-by: Nathan Gough <th...@gmail.com>
    
    This closes #5534.
---
 .../java/org/apache/nifi/groups/StandardProcessGroup.java     |  7 +++++++
 .../nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java    | 11 +++++++++--
 .../registry/flow/mapping/NiFiRegistryFlowMapperTest.java     |  6 ++++++
 3 files changed, 22 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 3583444..8618bd7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -4930,6 +4930,9 @@ public final class StandardProcessGroup implements ProcessGroup {
         port.setName(name);
         port.setPosition(new Position(proposed.getPosition().getX(), proposed.getPosition().getY()));
         port.setMaxConcurrentTasks(proposed.getConcurrentlySchedulableTaskCount());
+        if (org.apache.nifi.flow.ScheduledState.DISABLED == proposed.getScheduledState()) {
+            port.disable();
+        }
     }
 
     private Port addInputPort(final ProcessGroup destination, final VersionedPort proposed, final String componentIdSeed, final String temporaryName) {
@@ -5185,6 +5188,10 @@ public final class StandardProcessGroup implements ProcessGroup {
         descriptor.setId(generateUuid(proposed.getIdentifier(), rpgId, componentIdSeed));
         descriptor.setName(proposed.getName());
         descriptor.setUseCompression(proposed.isUseCompression());
+
+        final boolean transmitting = org.apache.nifi.flow.ScheduledState.ENABLED == proposed.getScheduledState();
+        descriptor.setTransmitting(transmitting);
+
         return descriptor;
     }
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
index b0b3334..889f3f1 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapper.java
@@ -581,6 +581,7 @@ public class NiFiRegistryFlowMapper {
         versionedPort.setName(port.getName());
         versionedPort.setPosition(mapPosition(port.getPosition()));
         versionedPort.setType(PortType.valueOf(port.getConnectableType().name()));
+        versionedPort.setScheduledState(mapScheduledState(port.getScheduledState()));
 
         if (port instanceof PublicPort) {
             versionedPort.setAllowRemoteAccess(true);
@@ -621,8 +622,7 @@ public class NiFiRegistryFlowMapper {
         processor.setSchedulingStrategy(procNode.getSchedulingStrategy().name());
         processor.setStyle(procNode.getStyle());
         processor.setYieldDuration(procNode.getYieldPeriod());
-        processor.setScheduledState(procNode.getScheduledState() == ScheduledState.DISABLED ? org.apache.nifi.flow.ScheduledState.DISABLED
-            : org.apache.nifi.flow.ScheduledState.ENABLED);
+        processor.setScheduledState(mapScheduledState(procNode.getScheduledState()));
 
         return processor;
     }
@@ -664,6 +664,7 @@ public class NiFiRegistryFlowMapper {
         port.setBatchSize(mapBatchSettings(remotePort));
         port.setTargetId(remotePort.getTargetIdentifier());
         port.setComponentType(componentType);
+        port.setScheduledState(mapScheduledState(remotePort.getScheduledState()));
         return port;
     }
 
@@ -730,4 +731,10 @@ public class NiFiRegistryFlowMapper {
         versionedParameter.setValue(descriptor.isSensitive() ? null : parameter.getValue());
         return versionedParameter;
     }
+
+    private org.apache.nifi.flow.ScheduledState mapScheduledState(final ScheduledState scheduledState) {
+         return scheduledState == ScheduledState.DISABLED
+                 ? org.apache.nifi.flow.ScheduledState.DISABLED
+                 : org.apache.nifi.flow.ScheduledState.ENABLED;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java
index bdbf6c1..b5882b6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/registry/flow/mapping/NiFiRegistryFlowMapperTest.java
@@ -34,6 +34,7 @@ import org.apache.nifi.connectable.Size;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.PropertyConfiguration;
+import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.LoadBalanceCompression;
@@ -436,6 +437,7 @@ public class NiFiRegistryFlowMapperTest {
         prepareComponentAuthorizable(port, processGroupId);
         preparePositionable(port);
         prepareConnectable(port, ConnectableType.valueOf(portType.name()));
+        when(port.getScheduledState()).thenReturn(ScheduledState.RUNNING);
         return port;
     }
 
@@ -532,6 +534,7 @@ public class NiFiRegistryFlowMapperTest {
         prepareComponentAuthorizable(remoteGroupPort, remoteProcessGroup.getIdentifier());
         when(remoteGroupPort.getName()).thenReturn("remotePort" + (counter++));
         when(remoteGroupPort.getRemoteProcessGroup()).thenReturn(remoteProcessGroup);
+        when(remoteGroupPort.getScheduledState()).thenReturn(ScheduledState.DISABLED);
         return remoteGroupPort;
     }
 
@@ -751,6 +754,7 @@ public class NiFiRegistryFlowMapperTest {
             assertEquals(port.getPosition().getY(), versionedPort.getPosition().getY(), 0);
             assertEquals(port.getName(), versionedPort.getName());
             assertEquals(portType, versionedPort.getType());
+            assertEquals(org.apache.nifi.flow.ScheduledState.ENABLED, versionedPort.getScheduledState());
         }
     }
 
@@ -767,6 +771,8 @@ public class NiFiRegistryFlowMapperTest {
             assertEquals(expectedPortGroupIdentifier, versionedRemotePort.getGroupIdentifier());
             assertEquals(remotePort.getName(), versionedRemotePort.getName());
             assertEquals(componentType, versionedRemotePort.getComponentType());
+            assertNotNull(versionedRemotePort.getScheduledState());
+            assertEquals(remotePort.getScheduledState().name(), versionedRemotePort.getScheduledState().name());
         }
     }