You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/02/15 18:14:11 UTC

[nifi] branch main updated: NIFI-9233 - Improve reliability of system integration tests (#5749)

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 87cfd43  NIFI-9233 - Improve reliability of system integration tests (#5749)
87cfd43 is described below

commit 87cfd43f6f872e14e904d63fe74645d41aaa8a02
Author: greyp9 <gr...@users.noreply.github.com>
AuthorDate: Tue Feb 15 13:14:01 2022 -0500

    NIFI-9233 - Improve reliability of system integration tests (#5749)
    
    * NIFI-9233 - Improve reliability of system integration tests
---
 .../service/StandardControllerServiceNode.java        | 16 ++++++++++------
 .../apache/nifi/controller/AbstractComponentNode.java | 19 +++++++++++++++++++
 .../org/apache/nifi/controller/ComponentNode.java     |  2 ++
 .../clustered/server/ConnectionLoadBalanceServer.java |  5 ++++-
 .../org/apache/nifi/tests/system/NiFiSystemIT.java    | 19 +++++++++++++++++++
 .../system/clustering/FlowSynchronizationIT.java      |  7 +++++--
 .../nifi/tests/system/clustering/OffloadIT.java       |  3 ++-
 .../nifi/tests/system/loadbalance/LoadBalanceIT.java  |  4 ++--
 8 files changed, 63 insertions(+), 12 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 8593201..f767bfc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -339,17 +339,21 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
     @Override
     public void verifyCanEnable() {
         final ControllerServiceState state = getState();
-        if (state != ControllerServiceState.DISABLED) {
-            throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled - it has a state of " + state);
+        switch (state) {
+            case DISABLED:
+                return;
+            case DISABLING:
+                throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled - it has a state of " + state);
+            default:
+                if (isReloadAdditionalResourcesNecessary()) {
+                    throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because additional resources are needed - it has a state of " + state);
+                }
         }
     }
 
     @Override
     public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) {
-        final ControllerServiceState state = getState();
-        if (state != ControllerServiceState.DISABLED) {
-            throw new IllegalStateException(getControllerServiceImplementation().getIdentifier() + " cannot be enabled because it is not disabled - it has a state of " + state);
-        }
+        verifyCanEnable();
     }
 
     @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index 24be80e..943a3d3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -627,6 +627,25 @@ public abstract class AbstractComponentNode implements ComponentNode {
 
     /**
      * Generates fingerprint for the additional urls and compares it with the previous
+     * fingerprint value.
+     */
+    @Override
+    public synchronized boolean isReloadAdditionalResourcesNecessary() {
+        // Components that don't have any PropertyDescriptors marked `dynamicallyModifiesClasspath`
+        // won't have the fingerprint i.e. will be null, in such cases do nothing
+        if (additionalResourcesFingerprint == null) {
+            return false;
+        }
+
+        final Set<PropertyDescriptor> descriptors = this.getProperties().keySet();
+        final Set<URL> additionalUrls = this.getAdditionalClasspathResources(descriptors);
+
+        final String newFingerprint = ClassLoaderUtils.generateAdditionalUrlsFingerprint(additionalUrls, determineClasloaderIsolationKey());
+        return (!StringUtils.equals(additionalResourcesFingerprint, newFingerprint));
+    }
+
+    /**
+     * Generates fingerprint for the additional urls and compares it with the previous
      * fingerprint value. If the fingerprint values don't match, the function calls the
      * component's reload() to load the newly found resources.
      */
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
index 875f877..5316c1c 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ComponentNode.java
@@ -147,6 +147,8 @@ public interface ComponentNode extends ComponentAuthorizable {
 
     void verifyCanUpdateBundle(BundleCoordinate bundleCoordinate) throws IllegalStateException;
 
+    boolean isReloadAdditionalResourcesNecessary();
+
     void reloadAdditionalResourcesIfNecessary();
 
     void resetValidationState();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
index 97b08cd..3083a3b 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/queue/clustered/server/ConnectionLoadBalanceServer.java
@@ -34,6 +34,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLPeerUnverifiedException;
 import javax.net.ssl.SSLServerSocket;
+
+import org.apache.commons.io.IOUtils;
 import org.apache.nifi.events.EventReporter;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.security.util.CertificateUtils;
@@ -96,7 +98,7 @@ public class ConnectionLoadBalanceServer {
     }
 
     public void stop() {
-        stopped = false;
+        stopped = true;
 
         if (acceptConnection != null) {
             acceptConnection.stop();
@@ -148,6 +150,7 @@ public class ConnectionLoadBalanceServer {
 
         public void stop() {
             this.stopped = true;
+            IOUtils.closeQuietly(socket);
         }
 
         @Override
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 159d7ec..3e6248c 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -20,7 +20,9 @@ import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClient;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientConfig;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
 import org.apache.nifi.toolkit.cli.impl.client.nifi.impl.JerseyNiFiClient;
+import org.apache.nifi.web.api.dto.NodeDTO;
 import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
+import org.apache.nifi.web.api.entity.ClusterEntity;
 import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -36,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -192,6 +195,7 @@ public abstract class NiFiSystemIT {
                 final ClusteSummaryEntity clusterSummary = client.getFlowClient().getClusterSummary();
                 final int connectedNodeCount = clusterSummary.getClusterSummary().getConnectedNodeCount();
                 if (connectedNodeCount == expectedNumberOfNodes) {
+                    logger.info("Wait successful, {} nodes connected", expectedNumberOfNodes);
                     return;
                 }
 
@@ -326,6 +330,21 @@ public abstract class NiFiSystemIT {
         }
     }
 
+    protected void waitForNodeStatus(final NodeDTO nodeDto, final String status) throws InterruptedException {
+        waitFor(() -> {
+            try {
+                final ClusterEntity clusterEntity = getNifiClient().getControllerClient().getNodes();
+                final Collection<NodeDTO> nodes = clusterEntity.getCluster().getNodes();
+                final NodeDTO nodeDtoMatch = nodes.stream()
+                        .filter(n -> n.getApiPort().equals(nodeDto.getApiPort())).findFirst().get();
+                return nodeDtoMatch.getStatus().equals(status);
+            } catch (final Exception e) {
+                logger.error("Failed to determine node status", e);
+            }
+            return false;
+        });
+    }
+
     protected void waitForQueueNotEmpty(final String connectionId) throws InterruptedException {
         logger.info("Waiting for Queue on Connection {} to not be empty", connectionId);
 
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index 417047e..b7dcad1 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -303,7 +303,10 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
 
         final ControllerServiceEntity node2SleepService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(sleepService.getId());
         assertEquals(sleepService.getId(), node2SleepService.getId());
-        waitFor(() -> node2SleepService.getComponent().getState().equals(ENABLED_STATE));
+        waitFor(() -> {
+            final ControllerServiceEntity updatedNode2SleepService = getNifiClient().getControllerServicesClient(DO_NOT_REPLICATE).getControllerService(sleepService.getId());
+            return updatedNode2SleepService.getComponent().getState().equals(ENABLED_STATE);
+        });
 
         final ReportingTaskEntity node2ReportingTask = getNifiClient().getReportingTasksClient(DO_NOT_REPLICATE).getReportingTask(reportingTask.getId());
         waitFor(() -> node2ReportingTask.getComponent().getState().equals(RUNNING_STATE));
@@ -683,7 +686,7 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
         connection.setDisconnectedNodeAcknowledged(true);
 
         // Delete the CountFlowFiles processor, and countB and countC services, disable A.
-        getNifiClient().getProcessorClient().stopProcessor(countFlowFiles);
+        getClientUtil().stopProcessor(countFlowFiles);
         getNifiClient().getConnectionClient().deleteConnection(connection);
         getNifiClient().getProcessorClient().deleteProcessor(countFlowFiles);
         getClientUtil().disableControllerServices("root", true);
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
index 1e66fc3..9553892 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/OffloadIT.java
@@ -75,10 +75,11 @@ public class OffloadIT extends NiFiSystemIT {
         final NodeDTO node2Dto = getNodeDTO(5672);
 
         disconnectNode(node2Dto);
+        waitForNodeStatus(node2Dto, "DISCONNECTED");
 
         final String nodeId = node2Dto.getNodeId();
         getClientUtil().offloadNode(nodeId);
-        waitFor(this::isNodeOffloaded);
+        waitForNodeStatus(node2Dto, "OFFLOADED");
 
         getClientUtil().connectNode(nodeId);
         waitForAllNodesConnected();
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
index 04716b9..b35d0b8 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
@@ -254,9 +254,9 @@ public class LoadBalanceIT extends NiFiSystemIT {
         final String nodeId = firstNodeDto.getNodeId();
 
         getClientUtil().disconnectNode(nodeId);
+        waitForNodeStatus(firstNodeDto, "DISCONNECTED");
         getClientUtil().offloadNode(nodeId);
-
-        waitFor(this::isNodeOffloaded);
+        waitForNodeStatus(firstNodeDto, "OFFLOADED");
 
         assertEquals(20, getQueueSize(connection.getId()));
         assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId()));