You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/07/28 18:36:42 UTC

[nifi] branch main updated: NIFI-10290: Updated system tests to ensure that they do not attempt to start processors while the processors are still validating.

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new d5386acb4f NIFI-10290: Updated system tests to ensure that they do not attempt to start processors while the processors are still validating.
d5386acb4f is described below

commit d5386acb4fd4d37629835665b9aa27056f20de71
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu Jul 28 11:11:04 2022 -0400

    NIFI-10290: Updated system tests to ensure that they do not attempt to start processors while the processors are still validating.
    
    This closes #6253
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../apache/nifi/tests/system/NiFiClientUtil.java   | 58 +++++++++++++++++++---
 .../system/clustering/FlowSynchronizationIT.java   |  2 +-
 .../clustering/JoinClusterAdjustStateIT.java       |  4 +-
 .../JoinClusterWithMissingConnectionWithData.java  |  2 +-
 .../tests/system/loadbalance/LoadBalanceIT.java    | 12 ++---
 .../system/parameters/ParameterContextIT.java      | 17 +++----
 .../tests/system/pg/BatchFlowBetweenGroupsIT.java  |  2 +-
 .../system/pg/SingleFlowFileConcurrencyIT.java     |  6 +--
 .../system/provenance/ProvenanceRepositoryIT.java  | 16 +++---
 .../tests/system/repositories/ContentAccessIT.java |  8 +--
 .../system/restart/FlowFileRestorationIT.java      |  2 +-
 .../tests/system/rpg/RemoteProcessGroupIT.java     |  2 +-
 .../system/variables/ProcessGroupVariablesIT.java  |  6 +--
 13 files changed, 88 insertions(+), 49 deletions(-)

diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 38e8a83fce..7276a53ace 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -101,6 +101,7 @@ import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -129,9 +130,11 @@ public class NiFiClientUtil {
         return client;
     }
 
-    public void startProcessor(final ProcessorEntity currentEntity) throws NiFiClientException, IOException {
+    public ProcessorEntity startProcessor(final ProcessorEntity currentEntity) throws NiFiClientException, IOException, InterruptedException {
+        waitForValidationCompleted(currentEntity);
+
         currentEntity.setDisconnectedNodeAcknowledged(true);
-        getProcessorClient().startProcessor(currentEntity);
+        return getProcessorClient().startProcessor(currentEntity);
     }
 
     public void stopProcessor(final ProcessorEntity currentEntity) throws NiFiClientException, IOException, InterruptedException {
@@ -475,6 +478,19 @@ public class NiFiClientUtil {
         }
     }
 
+    public void waitForValidationCompleted(final ProcessorEntity processorEntity) throws NiFiClientException, IOException, InterruptedException {
+        String validationStatus;
+        do {
+            final ProcessorEntity currentEntity = getProcessorClient().getProcessor(processorEntity.getId());
+            validationStatus = currentEntity.getComponent().getValidationStatus();
+
+            if (validationStatus.equals(ProcessorDTO.VALIDATING)) {
+                logger.debug("Waiting for Processor {} to finish validating...", processorEntity.getId());
+                Thread.sleep(100L);
+            }
+        } while (Objects.equals(validationStatus, ProcessorDTO.VALIDATING));
+    }
+
     public void waitForRunningProcessor(final String processorId) throws InterruptedException, IOException, NiFiClientException {
         waitForProcessorState(processorId, "RUNNING");
     }
@@ -606,7 +622,9 @@ public class NiFiClientUtil {
         return counterValues;
     }
 
-    public ScheduleComponentsEntity startProcessGroupComponents(final String groupId) throws NiFiClientException, IOException {
+    public ScheduleComponentsEntity startProcessGroupComponents(final String groupId) throws NiFiClientException, IOException, InterruptedException {
+        waitForAllProcessorValidationToComplete(groupId);
+
         final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity();
         scheduleComponentsEntity.setId(groupId);
         scheduleComponentsEntity.setState("RUNNING");
@@ -616,6 +634,30 @@ public class NiFiClientUtil {
         return scheduleEntity;
     }
 
+    private void waitForAllProcessorValidationToComplete(final String groupId) throws NiFiClientException, IOException, InterruptedException {
+        final Set<ProcessorEntity> processors = findAllProcessors(groupId);
+
+        for (final ProcessorEntity processor : processors) {
+            waitForValidationCompleted(processor);
+        }
+    }
+
+    private Set<ProcessorEntity> findAllProcessors(final String groupId) throws NiFiClientException, IOException {
+        final Set<ProcessorEntity> processors = new HashSet<>();
+        findAllProcessors(groupId, processors);
+        return processors;
+    }
+
+    private void findAllProcessors(final String groupId, final Set<ProcessorEntity> allProcessors) throws NiFiClientException, IOException {
+        final ProcessGroupFlowEntity flowEntity = nifiClient.getFlowClient().getProcessGroup(groupId);
+        final FlowDTO flowDto = flowEntity.getProcessGroupFlow().getFlow();
+        allProcessors.addAll(flowDto.getProcessors());
+
+        for (final ProcessGroupEntity childGroup : flowDto.getProcessGroups()) {
+            findAllProcessors(childGroup.getId(), allProcessors);
+        }
+    }
+
     public ScheduleComponentsEntity stopProcessGroupComponents(final String groupId) throws NiFiClientException, IOException {
         final ScheduleComponentsEntity scheduleComponentsEntity = new ScheduleComponentsEntity();
         scheduleComponentsEntity.setId(groupId);
@@ -722,13 +764,13 @@ public class NiFiClientUtil {
     public void waitForControllerServiceRunStatus(final String id, final String requestedRunStatus) throws NiFiClientException, IOException {
         while (true) {
             final ControllerServiceEntity serviceEntity = nifiClient.getControllerServicesClient().getControllerService(id);
-            final String runStatus = serviceEntity.getStatus().getRunStatus();
-            if (requestedRunStatus.equals(runStatus)) {
-                logger.info("Controller Service [{}] run status [{}] found", id, runStatus);
+            final String serviceState = serviceEntity.getComponent().getState();
+            if (requestedRunStatus.equals(serviceState)) {
+                logger.info("Controller Service [{}] run status [{}] found", id, serviceState);
                 break;
             }
 
-            logger.info("Controller Service [{}] run status [{}] not matched [{}]: sleeping before retrying", id, runStatus, requestedRunStatus);
+            logger.info("Controller Service [{}] run status [{}] not matched [{}]: sleeping before retrying", id, serviceState, requestedRunStatus);
 
             try {
                 Thread.sleep(500L);
@@ -773,7 +815,7 @@ public class NiFiClientUtil {
     public void waitForControllerServiceValidationStatus(final String controllerServiceId, final String validationStatus) throws NiFiClientException, IOException {
         while (true) {
             final ControllerServiceEntity controllerServiceEntity = nifiClient.getControllerServicesClient().getControllerService(controllerServiceId);
-            final String currentValidationStatus = controllerServiceEntity.getStatus().getValidationStatus();
+            final String currentValidationStatus = controllerServiceEntity.getComponent().getValidationStatus();
             if (validationStatus.equals(currentValidationStatus)) {
                 logger.info("Controller Service ID [{}] Type [{}] Validation Status [{}] matched", controllerServiceId,
                         controllerServiceEntity.getComponent().getType(), validationStatus);
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index ba8a606a6f..1ead90a772 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -733,7 +733,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         getClientUtil().enableControllerService(countB);
         getClientUtil().enableControllerService(countA);
 
-        getNifiClient().getProcessorClient().startProcessor(countFlowFiles);
+        getClientUtil().startProcessor(countFlowFiles);
 
         // Disconnect Node 2. Switch client to direct requests to Node 2 so that we can update the node while it's disconnected.
         disconnectNode(2);
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterAdjustStateIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterAdjustStateIT.java
index 6c89de2afb..5811e7b8b9 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterAdjustStateIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterAdjustStateIT.java
@@ -70,7 +70,7 @@ public class JoinClusterAdjustStateIT extends NiFiSystemIT {
 
         // Start the Processor that requires a file named "monitored" to exist. When we join Node 2 to the cluster, this directory will not exist.
         // We want to ensure that the Processor does in fact start on its own when the directory is created.
-        getNifiClient().getProcessorClient().startProcessor(fileProcessor.getId(), fileProcessor.getRevision().getClientId(), 1);
+        getClientUtil().startProcessor(fileProcessor);
         getClientUtil().waitForRunningProcessor(fileProcessor.getId());
 
         // Create a new NiFi instance
@@ -84,7 +84,7 @@ public class JoinClusterAdjustStateIT extends NiFiSystemIT {
 
         // Start the Count Processor on Node 1. When Node 2 joins the cluster, we know that its flow will indicate that the Processor is stopped.
         // But because the cluster indicates that the Processor is running, the second node should inherit this value and immediately start the Processor also.
-        getNifiClient().getProcessorClient().startProcessor(countProcessor.getId(), countProcessor.getRevision().getClientId(), 1);
+        getClientUtil().startProcessor(countProcessor);
 
         node2Instance.start();
 
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithMissingConnectionWithData.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithMissingConnectionWithData.java
index d5fcde5625..8694d14fcf 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithMissingConnectionWithData.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/JoinClusterWithMissingConnectionWithData.java
@@ -64,7 +64,7 @@ public class JoinClusterWithMissingConnectionWithData extends NiFiSystemIT {
         ProcessorEntity generate = getNifiClient().getProcessorClient().getProcessor(GENERATE_UUID);
 
         // Start Generate Processor
-        generate = getNifiClient().getProcessorClient().startProcessor(generate);
+        generate = getClientUtil().startProcessor(generate);
 
         // Wait for data to be queued up, one FlowFile for each node.
         waitFor(this::isDataQueued);
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
index 5e27f24737..d680fc7389 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
@@ -93,7 +93,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
         getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.ROUND_ROBIN, compression, null);
 
         // Generate the data.
-        getNifiClient().getProcessorClient().startProcessor(generate);
+        getClientUtil().startProcessor(generate);
 
         // Wait until all 20 FlowFiles are queued up.
         waitFor(() -> {
@@ -131,7 +131,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
         getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.SINGLE_NODE, LoadBalanceCompression.DO_NOT_COMPRESS, null);
 
         // Generate the data.
-        getNifiClient().getProcessorClient().startProcessor(generate);
+        getClientUtil().startProcessor(generate);
 
         // Wait until all 20 FlowFiles are queued up.
         waitFor(() -> {
@@ -187,7 +187,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
         // Queue 100 FlowFiles. 10 with number=0, 10 with number=1, 10 with number=2, etc. to up 10 with number=9
         for (int i=1; i <= 10; i++) {
             // Generate the data.
-            getNifiClient().getProcessorClient().startProcessor(generate);
+            getClientUtil().startProcessor(generate);
 
             final int expectedQueueSize = 10 * i;
 
@@ -238,7 +238,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
         getClientUtil().updateProcessorProperties(generate, generateProperties);
 
         // Generate the data.
-        getNifiClient().getProcessorClient().startProcessor(generate);
+        getClientUtil().startProcessor(generate);
 
         // Wait until all 20 FlowFiles are queued up.
         waitFor(() -> {
@@ -338,7 +338,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
         getClientUtil().updateConnectionLoadBalancing(connection, LoadBalanceStrategy.ROUND_ROBIN, LoadBalanceCompression.DO_NOT_COMPRESS, null);
 
         // Generate the data.
-        getNifiClient().getProcessorClient().startProcessor(generate);
+        getClientUtil().startProcessor(generate);
 
         // Wait until all 20 FlowFiles are queued up.
         waitFor(() -> {
@@ -380,7 +380,7 @@ public class LoadBalanceIT extends NiFiSystemIT {
         // times out.
         while (true) {
             // Generate the data.
-            getNifiClient().getProcessorClient().startProcessor(generate);
+            getClientUtil().startProcessor(generate);
 
             // Wait until all 20 FlowFiles are queued up
             waitFor(() -> {
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java
index 9526d62540..c9897be52b 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/parameters/ParameterContextIT.java
@@ -372,7 +372,7 @@ public class ParameterContextIT extends NiFiSystemIT {
         waitForValidProcessor(processorId);
 
         // Start Processors
-        getNifiClient().getProcessorClient().startProcessor(processorId, processorEntity.getRevision().getClientId(), 1);
+        getClientUtil().startProcessor(processorEntity);
 
         try {
             // Update Parameter Context to a long validation time.
@@ -388,9 +388,7 @@ public class ParameterContextIT extends NiFiSystemIT {
             waitForRunningProcessor(processorId);
         } finally {
             // Ensure that we stop the processor so that other tests are allowed to change the Parameter Context, etc.
-            getNifiClient().getProcessorClient().stopProcessor(processorId, processorEntity.getRevision().getClientId(), 2);
-            waitForStoppedProcessor(processorId);
-
+            getClientUtil().stopProcessor(processorEntity);
             getNifiClient().getProcessorClient().deleteProcessor(processorId, processorEntity.getRevision().getClientId(), 3);
         }
     }
@@ -419,7 +417,7 @@ public class ParameterContextIT extends NiFiSystemIT {
             processorEntity.getComponent().getConfig().setAutoTerminatedRelationships(Collections.singleton("success"));
 
             getNifiClient().getProcessorClient().updateProcessor(processorEntity);
-            getNifiClient().getProcessorClient().startProcessor(processorId, processorEntity.getRevision().getClientId(), 1L);
+            getClientUtil().startProcessor(processorEntity);
 
             try {
                 final ParameterContextUpdateRequestEntity requestEntity = updateParameterContext(createdContextEntity, "sleep", "6 secs");
@@ -435,8 +433,7 @@ public class ParameterContextIT extends NiFiSystemIT {
 
                 waitForRunningProcessor(processorId);
             } finally {
-                getNifiClient().getProcessorClient().stopProcessor(processorId, processorEntity.getRevision().getClientId(), 1L);
-                waitForStoppedProcessor(processorId);
+                getClientUtil().stopProcessor(processorEntity);
                 getNifiClient().getProcessorClient().deleteProcessor(processorId, processorEntity.getRevision().getClientId(), 3);
             }
         } finally {
@@ -532,7 +529,7 @@ public class ParameterContextIT extends NiFiSystemIT {
         getClientUtil().setAutoTerminatedRelationships(splitByLine, Collections.singleton("success"));
         getClientUtil().createConnection(generate, splitByLine, "success");
 
-        getNifiClient().getProcessorClient().startProcessor(splitByLine);
+        getClientUtil().startProcessor(splitByLine);
 
         // Change parameter to an invalid value. This will result in the processor being stopped, becoming invalid, and then being transitioned to a 'starting' state while invalid.
         final ParameterContextUpdateRequestEntity updateToInvalidRequestEntity = updateParameterContext(contextEntity, "clone", "invalid");
@@ -575,8 +572,8 @@ public class ParameterContextIT extends NiFiSystemIT {
         final ProcessorEntity secondProcessorEntity = createProcessor(TEST_PROCESSORS_PACKAGE + ".CountEvents", NIFI_GROUP_ID, TEST_EXTENSIONS_ARTIFACT_ID, getNiFiVersion());
 
         // Start Processors
-        getNifiClient().getProcessorClient().startProcessor(processorEntity.getId(), processorEntity.getRevision().getClientId(), 1L);
-        getNifiClient().getProcessorClient().startProcessor(secondProcessorEntity.getId(), secondProcessorEntity.getRevision().getClientId(), 1L);
+        getClientUtil().startProcessor(processorEntity);
+        getClientUtil().startProcessor(secondProcessorEntity);
 
         Map<String, Long> counterValues = waitForCounter(processorEntity.getId(), "Scheduled", getNumberOfNodes());
         assertFalse(counterValues.containsKey("Stopped"));
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/BatchFlowBetweenGroupsIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/BatchFlowBetweenGroupsIT.java
index 5c682b07f3..8fe795751c 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/BatchFlowBetweenGroupsIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/BatchFlowBetweenGroupsIT.java
@@ -108,7 +108,7 @@ public class BatchFlowBetweenGroupsIT extends NiFiSystemIT {
         getClientUtil().startProcessGroupComponents(processGroupA.getId());
 
         // Start generate processor and wait for data to queue up. Then stop.
-        getNifiClient().getProcessorClient().startProcessor(generate);
+        getClientUtil().startProcessor(generate);
         waitForQueueNotEmpty(generateToInputPortA.getId());
         getNifiClient().getProcessorClient().stopProcessor(generate);
 
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java
index 99a28ec5a0..7c96e82edf 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/pg/SingleFlowFileConcurrencyIT.java
@@ -127,7 +127,7 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
         // Start generate so that data is created. Start Input Port so that the data is ingested.
         // Start Output Ports but not the Sleep processor. This will keep data queued up for the Sleep processor,
         // and that should prevent data from being transferred by Output Port "Out2" also.
-        getNifiClient().getProcessorClient().startProcessor(generate);
+        getClientUtil().startProcessor(generate);
         getNifiClient().getInputPortClient().startInputPort(inputPort);
         getNifiClient().getOutputPortClient().startOutputPort(outputPort);
         getNifiClient().getOutputPortClient().startOutputPort(secondOut);
@@ -143,7 +143,7 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
         }
 
         // Start Sleep
-        getNifiClient().getProcessorClient().startProcessor(sleep);
+        getClientUtil().startProcessor(sleep);
 
         // Data should now flow from both output ports.
         waitForQueueCount(inputToSleep.getId(), 0);
@@ -192,7 +192,7 @@ public class SingleFlowFileConcurrencyIT extends NiFiSystemIT {
 
         // Start generate so that data is created. Start Input Port so that the data is ingested.
         // Start "Out" Output Ports but "Out2.". This will keep data queued up for the Out2 output port.
-        getNifiClient().getProcessorClient().startProcessor(generate);
+        getClientUtil().startProcessor(generate);
         getNifiClient().getInputPortClient().startInputPort(inputPort);
         getNifiClient().getOutputPortClient().startOutputPort(outputPort);
 
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java
index 5665a46bf8..a167dfa261 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/provenance/ProvenanceRepositoryIT.java
@@ -65,8 +65,8 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
         getClientUtil().setAutoTerminatedRelationships(count, "success");
         getClientUtil().createConnection(generateFlowFile, count, "success");
 
-        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
-        getNifiClient().getProcessorClient().startProcessor(count);
+        getClientUtil().startProcessor(generateFlowFile);
+        getClientUtil().startProcessor(count);
 
         ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO();
         searchValueDto.setValue(generateFlowFile.getId());
@@ -102,7 +102,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
         getClientUtil().setAutoTerminatedRelationships(terminate, "success");
         getClientUtil().createConnection(generateFlowFile, terminate, "success");
 
-        generateFlowFile = getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+        getClientUtil().startProcessor(generateFlowFile);
 
         ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO();
         searchValueDto.setValue(generateFlowFile.getId());
@@ -118,7 +118,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
         // The GenerateFlowFile would have 800. The first 200 events from Terminate will be in the first Event File, causing that one to
         // roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events,
         // depending on when the query is executed.
-        getNifiClient().getProcessorClient().startProcessor(terminate);
+        getClientUtil().startProcessor(terminate);
 
         ProvenanceSearchValueDTO terminateSearchValueDto = new ProvenanceSearchValueDTO();
         terminateSearchValueDto.setValue(terminate.getId());
@@ -131,7 +131,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
 
         // Emit 25 more events
         getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "25"));
-        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+        getClientUtil().startProcessor(generateFlowFile);
 
         // Wait for those 25 events to be emitted
         waitForEventCountAtLeast(generateSearchTerms, 25);
@@ -152,7 +152,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
         getClientUtil().setAutoTerminatedRelationships(terminate, "success");
         getClientUtil().createConnection(generateFlowFile, terminate, "success");
 
-        generateFlowFile = getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+        getClientUtil().startProcessor(generateFlowFile);
 
         ProvenanceSearchValueDTO searchValueDto = new ProvenanceSearchValueDTO();
         searchValueDto.setValue(generateFlowFile.getId());
@@ -168,7 +168,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
         // The GenerateFlowFile would have 800. The first 200 events from Terminate will be in the first Event File, causing that one to
         // roll over and subsequently be aged off. The second Event File will hold the other 600. So we may have 600 or 800 events,
         // depending on when the query is executed.
-        getNifiClient().getProcessorClient().startProcessor(terminate);
+        getClientUtil().startProcessor(terminate);
 
         ProvenanceSearchValueDTO terminateSearchValueDto = new ProvenanceSearchValueDTO();
         terminateSearchValueDto.setValue(terminate.getId());
@@ -193,7 +193,7 @@ public class ProvenanceRepositoryIT extends NiFiSystemIT {
         // Emit 400 more events
         generateFlowFile.getRevision().setVersion(0L); // Reset the revision
         getClientUtil().updateProcessorProperties(generateFlowFile, Collections.singletonMap("Batch Size", "400"));
-        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+        getClientUtil().startProcessor(generateFlowFile);
 
         // Since we restarted, the previous Event File will be rolled over. And since it will be > 1 KB in size, it will age off almost immediately.
         // This will leave us with only the 400 newly created events.
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java
index bca8e1c628..b6ee614eb8 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/ContentAccessIT.java
@@ -97,11 +97,11 @@ public class ContentAccessIT extends NiFiSystemIT {
         final ConnectionEntity verifyToTerminateUnmatched = getClientUtil().createConnection(verify, terminateAa, "unmatched");
 
         // Run Generate processor, wait for its output
-        getNifiClient().getProcessorClient().startProcessor(generate);
+        getClientUtil().startProcessor(generate);
         waitForQueueCount(generateToSplit.getId(), 1);
 
         // Run split processor, wait for its output
-        getNifiClient().getProcessorClient().startProcessor(split);
+        getClientUtil().startProcessor(split);
         waitForQueueCount(splitToReverse.getId(), 3);
 
         // Verify output of the Split processor
@@ -122,7 +122,7 @@ public class ContentAccessIT extends NiFiSystemIT {
         assertTrue(splitContents.contains("{ a : c }"));
 
         // Start the reverse processor, wait for its output
-        getNifiClient().getProcessorClient().startProcessor(reverse);
+        getClientUtil().startProcessor(reverse);
         waitForQueueCount(reverseToVerify.getId(), 3);
 
         final String firstReversedContents = getClientUtil().getFlowFileContentAsUtf8(reverseToVerify.getId(), 0);
@@ -140,7 +140,7 @@ public class ContentAccessIT extends NiFiSystemIT {
 
         // Start verify processor. This is different than verify the contents above because doing so above is handled by making a REST call, which does not make use
         // of the ProcessSession. Using the VerifyContents processor ensures that the Processors see the same contents.
-        getNifiClient().getProcessorClient().startProcessor(verify);
+        getClientUtil().startProcessor(verify);
 
         waitForQueueCount(verifyToTerminateAa.getId(), 1);
         waitForQueueCount(verifyToTerminateBa.getId(), 1);
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java
index 747103cdb7..4a19a07c05 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/restart/FlowFileRestorationIT.java
@@ -50,7 +50,7 @@ public class FlowFileRestorationIT extends NiFiSystemIT {
         final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile");
         final ConnectionEntity connection = getClientUtil().createConnection(generator, terminate, "success");
 
-        getNifiClient().getProcessorClient().startProcessor(generator);
+        getClientUtil().startProcessor(generator);
         waitForQueueCount(connection.getId(), 1);
         getNifiClient().getProcessorClient().stopProcessor(generator);
 
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/rpg/RemoteProcessGroupIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/rpg/RemoteProcessGroupIT.java
index d58e15aea8..045a2fe3a0 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/rpg/RemoteProcessGroupIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/rpg/RemoteProcessGroupIT.java
@@ -102,7 +102,7 @@ public class RemoteProcessGroupIT extends NiFiSystemIT {
 
         getNifiClient().getInputPortClient().startInputPort(port);
         getClientUtil().waitForValidProcessor(generateFlowFile.getId());
-        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+        getClientUtil().startProcessor(generateFlowFile);
         getNifiClient().getRemoteProcessGroupClient().startTransmitting(rpg);
 
         waitFor(() -> util.getQueueSize(generateToRPG.getId()).getObjectCount() == 0);
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java
index 3217a3ae2e..1fb428ca30 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/variables/ProcessGroupVariablesIT.java
@@ -50,7 +50,7 @@ public class ProcessGroupVariablesIT extends NiFiSystemIT {
         getClientUtil().waitForValidProcessor(generateFlowFile.getId());
 
         // Start Processor, wait for 1 FlowFile to be queued up, then stop processor
-        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+        getClientUtil().startProcessor(generateFlowFile);
         waitForQueueCount(connection.getId(), 1);
 
         final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
@@ -89,7 +89,7 @@ public class ProcessGroupVariablesIT extends NiFiSystemIT {
         getClientUtil().waitForValidProcessor(generateFlowFile.getId());
 
         // Start Processor, wait for 1 FlowFile to be queued up, then stop processor
-        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+        getClientUtil().startProcessor(generateFlowFile);
         waitForQueueCount(connection.getId(), 1);
 
         final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);
@@ -129,7 +129,7 @@ public class ProcessGroupVariablesIT extends NiFiSystemIT {
         getClientUtil().waitForValidProcessor(generateFlowFile.getId());
 
         // Start Processor, wait for 1 FlowFile to be queued up, then stop processor
-        getNifiClient().getProcessorClient().startProcessor(generateFlowFile);
+        getClientUtil().startProcessor(generateFlowFile);
         waitForQueueCount(connection.getId(), 1);
 
         final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), 0);