You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/05/19 18:14:51 UTC

[nifi] branch main updated: NIFI-10037: When system test fails to clean up flow, destroy the entire environment so that the next test starts in a healthy state. Name troubleshooting directories with the name of the test class to avoid ambiguity. Also added a log statement so that we know which test is running when looking at the log output from the tests themselves. Finally, found an issue in AbstractComponentNode in which we iterate over the elements in a Map and call setProperty, which can update the underlying Map [...]

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 38b51b0dde NIFI-10037: When system test fails to clean up flow, destroy the entire environment so that the next test starts in a healthy state. Name troubleshooting directories with the name of the test class to avoid ambiguity. Also added a log statement so that we know which test is running when looking at the log output from the tests themselves. Finally, found an issue in AbstractComponentNode in which we iterate over the elements in a Map and call setProperty, which can upda [...]
38b51b0dde is described below

commit 38b51b0dde24929563c4fc6c9a8c7a10e39ef713
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Thu May 19 11:58:52 2022 -0400

    NIFI-10037: When system test fails to clean up flow, destroy the entire environment so that the next test starts in a healthy state. Name troubleshooting directories with the name of the test class to avoid ambiguity. Also added a log statement so that we know which test is running when looking at the log output from the tests themselves. Finally, found an issue in AbstractComponentNode in which we iterate over the elements in a Map and call setProperty, which can update the underlyin [...]
    
    This closes #6059
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../nifi/controller/AbstractComponentNode.java     |  6 ++-
 .../org/apache/nifi/tests/system/NiFiSystemIT.java |  8 ++++
 .../tests/system/TroubleshootingTestWatcher.java   | 11 +++--
 .../tests/system/loadbalance/LoadBalanceIT.java    | 54 ++++++++++++++++------
 .../resources/conf/clustered/node1/nifi.properties |  2 +-
 .../resources/conf/clustered/node2/bootstrap.conf  |  2 +-
 .../resources/conf/clustered/node2/nifi.properties |  2 +-
 .../test/resources/conf/default/nifi.properties    |  2 +-
 8 files changed, 64 insertions(+), 23 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
index 59f9bd50c4..61d17eb4b8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractComponentNode.java
@@ -66,6 +66,7 @@ import java.net.URL;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
@@ -591,7 +592,10 @@ public abstract class AbstractComponentNode implements ComponentNode {
         // use setProperty instead of setProperties so we can bypass the class loading logic.
         // Consider value changed if it is different than the PropertyDescriptor's default value because we need to call the #onPropertiesModified
         // method on the component if the current value is not the default value, since the component itself is being reloaded.
-        for (final Map.Entry<PropertyDescriptor, PropertyConfiguration> entry : this.properties.entrySet()) {
+        // Also, create a copy of this.properties instead of iterating directly over this.properties since the call to setProperty can change the
+        // underlying map, and the behavior of modifying the map while iterating over its elements is undefined.
+        final Map<PropertyDescriptor, PropertyConfiguration> copyOfPropertiesMap = new HashMap<>(this.properties);
+        for (final Map.Entry<PropertyDescriptor, PropertyConfiguration> entry : copyOfPropertiesMap.entrySet()) {
             final PropertyDescriptor propertyDescriptor = entry.getKey();
             final PropertyConfiguration configuration = entry.getValue();
 
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
index 341e384812..298e3ec473 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiSystemIT.java
@@ -73,6 +73,8 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
     @BeforeEach
     public void setup(final TestInfo testInfo) throws IOException {
         this.testInfo = testInfo;
+        final String testClassName = testInfo.getTestClass().map(Class::getSimpleName).orElse("<Unknown Test Class>");
+        logger.info("Beginning Test {}:{}", testClassName, testInfo.getDisplayName());
 
         Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader());
         setupClient();
@@ -116,6 +118,12 @@ public abstract class NiFiSystemIT implements NiFiInstanceProvider {
 
             if (isDestroyEnvironmentAfterEachTest()) {
                 cleanup();
+            } else if (destroyFlowFailure != null) {
+                // If unable to destroy the flow, we need to shutdown the instance and delete the flow and completely recreate the environment.
+                // Otherwise, we will be left in an unknown state for the next test, and that can cause cascading failures that are very difficult
+                // to understand and troubleshoot.
+                logger.info("Because there was a failure when destroying the flow, will completely tear down the environments and start with a clean environment for the next test.");
+                cleanup();
             }
 
             if (destroyFlowFailure != null) {
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/TroubleshootingTestWatcher.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/TroubleshootingTestWatcher.java
index 8d71362359..b2c1ab078f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/TroubleshootingTestWatcher.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/TroubleshootingTestWatcher.java
@@ -40,7 +40,8 @@ public class TroubleshootingTestWatcher implements TestWatcher {
                 final NiFiInstanceProvider provider = (NiFiInstanceProvider) testInstance;
                 final String displayName = context.getDisplayName();
                 try {
-                    final File dir = quarantineTroubleshootingInfo(provider, displayName, cause);
+                    final String testClassName = context.getTestClass().map(Class::getSimpleName).orElse("TestClassUnknown");
+                    final File dir = quarantineTroubleshootingInfo(provider, testClassName, displayName, cause);
                     logger.info("Test Failed [{}]: Troubleshooting information stored [{}]", displayName, dir.getAbsolutePath());
                 } catch (final Exception e) {
                     logger.error("Test Failed [{}]: Troubleshooting information not stored", displayName, e);
@@ -49,17 +50,21 @@ public class TroubleshootingTestWatcher implements TestWatcher {
         }
     }
 
-    private File quarantineTroubleshootingInfo(final NiFiInstanceProvider provider, final String methodName, final Throwable failureCause) throws IOException {
+    private File quarantineTroubleshootingInfo(final NiFiInstanceProvider provider, final String testClassName, final String methodName, final Throwable failureCause) throws IOException {
         NiFiInstance instance = provider.getNiFiInstance();
 
         // The teardown method may or may not have already run at this point. If it has, the instance will be null.
         // In that case, just create a new instance and use it - it will map to the same directories.
         if (instance == null) {
+            logger.warn("While capturing troubleshooting info for {}, the NiFi Instance is not available. Will create a new one for Diagnostics purposes, but some of the diagnostics may be less " +
+                "accurate, since it's not the same instance that ran the test", methodName);
+
             instance = provider.getInstanceFactory().createInstance();
         }
 
         final File troubleshooting = new File("target/troubleshooting");
-        final File quarantineDir = new File(troubleshooting, methodName);
+        final String quarantineDirName = testClassName + "-" + methodName.replace("()", "");
+        final File quarantineDir = new File(troubleshooting, quarantineDirName);
         quarantineDir.mkdirs();
 
         instance.quarantineTroubleshootingInfo(quarantineDir, failureCause);
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
index 2e1b1c0698..5e27f24737 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/loadbalance/LoadBalanceIT.java
@@ -42,12 +42,13 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.LongSummaryStatistics;
 import java.util.Map;
 import java.util.Set;
 
-import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class LoadBalanceIT extends NiFiSystemIT {
     private final Logger logger = LoggerFactory.getLogger(getClass());
@@ -277,13 +278,13 @@ public class LoadBalanceIT extends NiFiSystemIT {
     private int getQueueSize(final String connectionId) {
         final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId);
         final ConnectionStatusDTO connectionStatusDto = statusEntity.getConnectionStatus();
-        return connectionStatusDto.getAggregateSnapshot().getFlowFilesQueued().intValue();
+        return connectionStatusDto.getAggregateSnapshot().getFlowFilesQueued();
     }
 
     private long getQueueBytes(final String connectionId) {
         final ConnectionStatusEntity statusEntity = getConnectionStatus(connectionId);
         final ConnectionStatusDTO connectionStatusDto = statusEntity.getConnectionStatus();
-        return connectionStatusDto.getAggregateSnapshot().getBytesQueued().longValue();
+        return connectionStatusDto.getAggregateSnapshot().getBytesQueued();
     }
 
     private boolean isConnectionDoneLoadBalancing(final String connectionId) {
@@ -372,22 +373,45 @@ public class LoadBalanceIT extends NiFiSystemIT {
         instance2.start(true);
         waitForAllNodesConnected();
 
-        // Generate the data again
         generate = getNifiClient().getProcessorClient().getProcessor(generate.getId());
-        getNifiClient().getProcessorClient().startProcessor(generate);
 
-        // Wait until all 20 FlowFiles are queued up
-        waitFor(() -> {
-            final ConnectionStatusEntity secondRoundStatusEntity = getConnectionStatus(connection.getId());
-            return secondRoundStatusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == 20;
-        });
+        // Generate data and wait for it to be spread across the cluster. We do this in an infinite while() loop because
+        // there can be a failure, in which case we'll retry. If that happens, we just want to keep retrying until the test
+        // times out.
+        while (true) {
+            // Generate the data.
+            getNifiClient().getProcessorClient().startProcessor(generate);
 
-        // Wait until load balancing is complete
-        waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
+            // Wait until all 20 FlowFiles are queued up
+            waitFor(() -> {
+                final ConnectionStatusEntity secondRoundStatusEntity = getConnectionStatus(connection.getId());
+                return secondRoundStatusEntity.getConnectionStatus().getAggregateSnapshot().getFlowFilesQueued() == 20;
+            });
 
-        // Ensure that the FlowFiles are evenly distributed between the nodes.
-        final ConnectionStatusEntity afterSecondDataGenerationStatusEntity = getConnectionStatus(connection.getId());
-        assertTrue(isEvenlyDistributed(afterSecondDataGenerationStatusEntity));
+            // Wait until load balancing is complete
+            waitFor(() -> isConnectionDoneLoadBalancing(connection.getId()));
+
+            // Log the distribution of data between nodes for easier troubleshooting in case there's a failure.
+            final ConnectionStatusEntity afterSecondDataGenerationStatusEntity = getConnectionStatus(connection.getId());
+            final List<NodeConnectionStatusSnapshotDTO> nodeSnapshots = afterSecondDataGenerationStatusEntity.getConnectionStatus().getNodeSnapshots();
+            logger.info("FlowFiles Queued Per Node:");
+            nodeSnapshots.forEach(snapshot ->
+                logger.info("{}:{} - {}", snapshot.getAddress(), snapshot.getApiPort(), snapshot.getStatusSnapshot().getFlowFilesQueued())
+            );
+
+            // Check if the FlowFiles are evenly distributed between the nodes. If so, we're done.
+            final boolean evenlyDistributed = isEvenlyDistributed(afterSecondDataGenerationStatusEntity);
+            if (evenlyDistributed) {
+                break;
+            }
+
+            // If there's an IOException thrown while communicating between the nodes, the data will be rebalanced and will go to
+            // the local partition. There's nothing we can do about that in this test. However, we can verify that NiFi recovers
+            // from this and continues to distribute data. To do that, we will stop the processor so that it can be started again
+            // (and produce more data) and we can empty the queue so that we know how much data to expect.
+            getNifiClient().getProcessorClient().stopProcessor(generate);
+            getClientUtil().emptyQueue(connection.getId());
+        }
 
         assertEquals(20, getQueueSize(connection.getId()));
         assertEquals(20 * 1024 * 1024, getQueueBytes(connection.getId()));
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
index 4544f2fc3b..049ac178ef 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node1/nifi.properties
@@ -77,7 +77,7 @@ nifi.content.repository.implementation=org.apache.nifi.controller.repository.Fil
 nifi.content.claim.max.appendable.size=50 KB
 nifi.content.repository.directory.default=./content_repository
 nifi.content.repository.archive.max.retention.period=12 hours
-nifi.content.repository.archive.max.usage.percentage=50%
+nifi.content.repository.archive.max.usage.percentage=90%
 nifi.content.repository.archive.enabled=true
 nifi.content.repository.always.sync=false
 nifi.content.viewer.url=../nifi-content-viewer/
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
index 930e9449db..80bd3ed93d 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/bootstrap.conf
@@ -27,7 +27,7 @@ java.arg.3=-Xmx512m
 
 java.arg.14=-Djava.awt.headless=true
 
-java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8003
+#java.arg.debug=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8003
 
 java.arg.nodeNum=-DnodeNumber=2
 
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
index acd5c6707c..4b7f644058 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/clustered/node2/nifi.properties
@@ -77,7 +77,7 @@ nifi.content.repository.implementation=org.apache.nifi.controller.repository.Fil
 nifi.content.claim.max.appendable.size=50 KB
 nifi.content.repository.directory.default=./content_repository
 nifi.content.repository.archive.max.retention.period=12 hours
-nifi.content.repository.archive.max.usage.percentage=50%
+nifi.content.repository.archive.max.usage.percentage=90%
 nifi.content.repository.archive.enabled=true
 nifi.content.repository.always.sync=false
 nifi.content.viewer.url=../nifi-content-viewer/
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
index 7b3de1452f..7c6426c67b 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/resources/conf/default/nifi.properties
@@ -77,7 +77,7 @@ nifi.content.repository.implementation=org.apache.nifi.controller.repository.Fil
 nifi.content.claim.max.appendable.size=50 KB
 nifi.content.repository.directory.default=./content_repository
 nifi.content.repository.archive.max.retention.period=12 hours
-nifi.content.repository.archive.max.usage.percentage=50%
+nifi.content.repository.archive.max.usage.percentage=90%
 nifi.content.repository.archive.enabled=true
 nifi.content.repository.always.sync=false
 nifi.content.viewer.url=../nifi-content-viewer/