You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/06/21 22:14:51 UTC

[nifi] branch support/nifi-1.x updated: NIFI-11737: Improved performance of FlowSynchronizationIT

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new a47f5b8045 NIFI-11737: Improved performance of FlowSynchronizationIT
a47f5b8045 is described below

commit a47f5b8045f84d5e2e5aa6482f10a908c55f4565
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Jun 16 14:57:42 2023 -0400

    NIFI-11737: Improved performance of FlowSynchronizationIT
    
    - FlowSynchronizationIT no longer requires isDestroyEnvironmentAfterEachTest to return true
    
    This closes #7420
    
    Signed-off-by: David Handermann <ex...@apache.org>
    (cherry picked from commit 9709bd6fb749e1a1c7bc52076e69d96f5235802a)
---
 .../system/clustering/FlowSynchronizationIT.java   | 46 ++++++++++++++--------
 1 file changed, 30 insertions(+), 16 deletions(-)

diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index aff856f05d..d758529b12 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -96,16 +96,11 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         );
     }
 
-    @Override
-    protected boolean isDestroyEnvironmentAfterEachTest() {
-        return true;
-    }
-
 
     @Test
     public void testParameterUpdateWhileNodeDisconnected() throws NiFiClientException, IOException, InterruptedException {
         // Add Parameter context with Param1 = 1
-        final ParameterContextEntity parameterContextEntity = getClientUtil().createParameterContext("Context1", Collections.singletonMap("Param1", "1"));
+        final ParameterContextEntity parameterContextEntity = getClientUtil().createParameterContext("testParameterUpdateWhileNodeDisconnected", Collections.singletonMap("Param1", "1"));
         getClientUtil().setParameterContext("root", parameterContextEntity);
 
         // Create a GenerateFlowFile that adds an attribute with name 'attr' and a value that references the parameter
@@ -166,7 +161,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         final ProcessorEntity countEvents3 = getClientUtil().createProcessor("CountEvents");
 
         // Create parameter context with a sensitive parameter and set that on the root group
-        final ParameterContextEntity paramContext = getClientUtil().createParameterContext("context1", "MyParameter", "Our Secret", true);
+        final ParameterContextEntity paramContext = getClientUtil().createParameterContext("testSensitivePropertiesInherited", "MyParameter", "Our Secret", true);
         getClientUtil().setParameterContext("root", paramContext);
 
         // Set sensitive property of 1 processor to an explicit value and sensitive property of another to a sensitive parameter.
@@ -202,13 +197,14 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
 
     @Test
     public void testComponentsRecreatedOnRejoinCluster() throws NiFiClientException, IOException, InterruptedException {
+        final ProcessGroupEntity topLevel = getClientUtil().createProcessGroup("testComponentsRecreatedOnRejoinCluster", "root");
         // Build dataflow with processors at root level and an inner group that contains an input port, output port, and a processor, as well as a Controller Service that the processor will use.
-        final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile");
-        final ProcessGroupEntity group = getClientUtil().createProcessGroup("Inner Group", "root");
+        final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile", topLevel.getId());
+        final ProcessGroupEntity group = getClientUtil().createProcessGroup("Inner Group", topLevel.getId());
         final PortEntity inPort = getClientUtil().createInputPort("In", group.getId());
         final PortEntity outPort = getClientUtil().createOutputPort("Out", group.getId());
         final ProcessorEntity count = getClientUtil().createProcessor("CountFlowFiles", group.getId());
-        final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
+        final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile", topLevel.getId());
         getClientUtil().updateProcessorSchedulingPeriod(generate, "60 sec");
 
         final ControllerServiceEntity countService = getClientUtil().createControllerService("StandardCountService", group.getId());
@@ -230,7 +226,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         reportingTaskProperties.put("Text", "${now():toNumber()}");
         getClientUtil().updateReportingTaskProperties(reportingTask, reportingTaskProperties);
 
-        final ParameterContextEntity context = getClientUtil().createParameterContext("Context1", "abc", "hello", false);
+        final ParameterContextEntity context = getClientUtil().createParameterContext("testComponentsRecreatedOnRejoinCluster", "abc", "hello", false);
 
         // Disconnect Node 2
         disconnectNode(2);
@@ -260,7 +256,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         // is on Node 2.
         switchClientToNode(2);
 
-        final ProcessGroupFlowEntity flow = getNifiClient().getFlowClient(DO_NOT_REPLICATE).getProcessGroup("root");
+        final ProcessGroupFlowEntity flow = getNifiClient().getFlowClient(DO_NOT_REPLICATE).getProcessGroup(topLevel.getId());
         final FlowDTO flowDto = flow.getProcessGroupFlow().getFlow();
         assertEquals(2, flowDto.getConnections().size());
         assertEquals(2, flowDto.getProcessors().size());
@@ -395,11 +391,12 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         // Wait for node to show as disconnected because it doesn't have the necessary nar
         waitForNodeState(2, NodeConnectionState.DISCONNECTED);
 
-        // We need to either restart Node 2 or remove it from the cluster in order to ensure that we can properly shutdown.
-        // Reconnecting to the cluster would require restoring the NAR file and restarting, which will take longer than simply removing the
-        // node from the cluster. So we opt for shutting down the node and removing it from the cluster.
+        // We need to restore the extensions nar and restart the node so that subsequent tests can succeed
+        restoreExtensionsNar(node2);
         node2.stop();
-        removeNode(2);
+        node2.start();
+
+        waitForAllNodesConnected();
     }
 
     private void removeNode(final int index) throws NiFiClientException, IOException, InterruptedException {
@@ -437,6 +434,17 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         waitForAllNodesConnected();
 
         assertTrue(getNifiClient().getProcessorClient().getProcessor(generate.getId()).getComponent().getExtensionMissing());
+
+        // In order to ensure that subsequent tests are able to operate properly, we need to restore the nar and restart
+        node1.stop();
+        node2.stop();
+
+        restoreExtensionsNar(node1);
+        restoreExtensionsNar(node2);
+
+        node1.start(false);
+        node2.start(true);
+        waitForAllNodesConnected();
     }
 
 
@@ -511,6 +519,12 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         assertTrue(extensionsNar.renameTo(backupFile));
     }
 
+    private void restoreExtensionsNar(final NiFiInstance nifiInstance) {
+        final File backupFile = getExtensionsNar(nifiInstance);
+        final File extensionsNar = new File(backupFile.getParentFile(), backupFile.getName().replace(".backup", ""));
+        assertTrue(backupFile.renameTo(extensionsNar));
+    }
+
     private File getExtensionsNar(final NiFiInstance nifiInstance) {
         final File libDir = new File(nifiInstance.getInstanceDirectory(), "lib");
         final File[] testExtensionsNar = libDir.listFiles(file -> file.getName().startsWith("nifi-system-test-extensions-nar-"));