You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/07/25 15:04:16 UTC

[nifi] branch main updated: NIFI-10262: Ensure that when an Exception is thrown from a Controller Service's @OnEnabled method that we properly handle that Exception and continue enabling the other services in the given collection of services

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 0497907829 NIFI-10262: Ensure that when an Exception is thrown from a Controller Service's @OnEnabled method that we properly handle that Exception and continue enabling the other services in the given collection of services
0497907829 is described below

commit 0497907829ca25523622c021073b90c1aa9a3094
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Jul 22 16:13:15 2022 -0400

    NIFI-10262: Ensure that when an Exception is thrown from a Controller Service's @OnEnabled method that we properly handle that Exception and continue enabling the other services in the given collection of services
    
    This closes #6236
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../service/StandardControllerServiceNode.java     |   8 +-
 .../service/StandardControllerServiceProvider.java | 126 +++++++++++++--------
 .../cs/tests/system/LifecycleFailureService.java   |  78 +++++++++++++
 .../org.apache.nifi.controller.ControllerService   |   1 +
 .../apache/nifi/tests/system/NiFiClientUtil.java   |  18 +++
 .../system/clustering/FlowSynchronizationIT.java   |   1 -
 .../ControllerServiceLifecycleIT.java              |  57 ++++++++++
 7 files changed, 236 insertions(+), 53 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 0da7c98884..02189a1579 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -324,7 +324,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
     @Override
     public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
         if (!this.isActive()) {
-            throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation().getIdentifier() + " because it is not enabled");
+            return;
         }
 
         final ControllerServiceReference references = getReferences();
@@ -608,16 +608,12 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
 
                         final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
                         final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
-                        componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
-                        LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
+                        componentLog.error("Failed to invoke @OnEnabled method", cause);
                         invokeDisable(configContext);
 
                         if (isActive()) {
                             scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
                         } else {
-                            try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getControllerServiceImplementation().getClass(), getIdentifier())) {
-                                ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
-                            }
                             stateTransition.disable();
                         }
                     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 414d629f0d..735414d376 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -48,9 +48,11 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.function.BooleanSupplier;
 import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
@@ -224,12 +226,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         if (shouldStart) {
             for (ControllerServiceNode controllerServiceNode : serviceNodes) {
                 try {
-                    if (!controllerServiceNode.isActive()) {
-                        final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
+                    final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
 
-                        future.get(30, TimeUnit.SECONDS);
-                        logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
-                    }
+                    future.get(30, TimeUnit.SECONDS);
+                    logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
                 } catch (final ControllerServiceNotValidException csnve) {
                     logger.warn("Failed to enable service {} because it is not currently valid", controllerServiceNode);
                 } catch (Exception e) {
@@ -247,14 +247,20 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     public Future<Void> enableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
         final CompletableFuture<Void> future = new CompletableFuture<>();
         processScheduler.submitFrameworkTask(() -> {
-            enableControllerServices(serviceNodes, future);
-            future.complete(null);
+            try {
+                enableControllerServices(serviceNodes, future);
+                future.complete(null);
+            } catch (final Exception e) {
+                future.completeExceptionally(e);
+            }
         });
 
         return future;
     }
 
-    private void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> completableFuture) {
+    private void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> completableFuture) throws Exception {
+        Exception firstFailure = null;
+
         // validate that we are able to start all of the services.
         for (final ControllerServiceNode controllerServiceNode : serviceNodes) {
             if (completableFuture.isCancelled()) {
@@ -262,29 +268,37 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             }
 
             try {
-                if (!controllerServiceNode.isActive()) {
-                    final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
+                // If service is already active, just move on to the next
+                if (controllerServiceNode.isActive()) {
+                    continue;
+                }
 
-                    while (true) {
-                        try {
-                            future.get(1, TimeUnit.SECONDS);
-                            logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
-                            break;
-                        } catch (final TimeoutException e) {
-                            if (completableFuture.isCancelled()) {
-                                return;
-                            }
-                        } catch (final Exception e) {
-                            logger.warn("Failed to enable service {}", controllerServiceNode, e);
-                            completableFuture.completeExceptionally(e);
-
-                            if (this.bulletinRepo != null) {
-                                this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
-                                    Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e));
-                            }
+                final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
 
+                // Wait for the future to complete. But if the completableFuture ever is canceled, we want to stop waiting and return.
+                while (true) {
+                    try {
+                        future.get(1, TimeUnit.SECONDS);
+                        logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
+                        break;
+                    } catch (final TimeoutException e) {
+                        if (completableFuture.isCancelled()) {
                             return;
                         }
+                    } catch (final Exception e) {
+                        logger.warn("Failed to enable service {}", controllerServiceNode, e);
+                        if (firstFailure == null) {
+                            firstFailure = e;
+                        } else {
+                            firstFailure.addSuppressed(e);
+                        }
+
+                        if (this.bulletinRepo != null) {
+                            this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
+                                Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e));
+                        }
+
+                        break;
                     }
                 }
             } catch (Exception e) {
@@ -295,6 +309,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
                 }
             }
         }
+
+        if (firstFailure != null) {
+            throw firstFailure;
+        }
     }
 
     @Override
@@ -382,14 +400,18 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     public Future<Void> disableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
         final CompletableFuture<Void> future = new CompletableFuture<>();
         processScheduler.submitFrameworkTask(() -> {
-            disableControllerServices(serviceNodes, future);
-            future.complete(null);
+            try {
+                disableControllerServices(serviceNodes, future);
+                future.complete(null);
+            } catch (final Exception e) {
+                future.completeExceptionally(e);
+            }
         });
 
         return future;
     }
 
-    private void disableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> future) {
+    private void disableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> future) throws Exception {
         final Set<ControllerServiceNode> serviceNodeSet = new HashSet<>(serviceNodes);
 
         // Verify that for each Controller Service given, any service that references it is either disabled or is also in the given collection
@@ -406,24 +428,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             }
         }
 
+        Exception firstFailure = null;
         for (final ControllerServiceNode serviceNode : serviceNodes) {
             if (serviceNode.isActive()) {
-                disableReferencingServices(serviceNode);
-                final CompletableFuture<?> serviceFuture = disableControllerService(serviceNode);
-
-                while (true) {
-                    try {
-                        serviceFuture.get(1, TimeUnit.SECONDS);
-                        break;
-                    } catch (final TimeoutException e) {
-                        if (future.isCancelled()) {
-                            return;
-                        }
-
-                        continue;
-                    } catch (final Exception e) {
-                        logger.error("Failed to disable {}", serviceNode, e);
-                        future.completeExceptionally(e);
+                try {
+                    disableControllerServiceAndReferencingServices(serviceNode, future::isCancelled);
+                } catch (final Exception e) {
+                    if (firstFailure == null) {
+                        firstFailure = e;
+                    } else {
+                        firstFailure.addSuppressed(e);
                     }
                 }
             } else {
@@ -438,6 +452,26 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
                 }
             }
         }
+
+        if (firstFailure != null) {
+            throw firstFailure;
+        }
+    }
+
+    private void disableControllerServiceAndReferencingServices(final ControllerServiceNode serviceNode, final BooleanSupplier cancelSupplier) throws ExecutionException, InterruptedException {
+        disableReferencingServices(serviceNode);
+        final CompletableFuture<?> serviceFuture = disableControllerService(serviceNode);
+
+        while (true) {
+            try {
+                serviceFuture.get(1, TimeUnit.SECONDS);
+                break;
+            } catch (final TimeoutException e) {
+                if (cancelSupplier.getAsBoolean()) {
+                    return;
+                }
+            }
+        }
     }
 
     @Override
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/LifecycleFailureService.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/LifecycleFailureService.java
new file mode 100644
index 0000000000..59df1c404f
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/LifecycleFailureService.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cs.tests.system;
+
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class LifecycleFailureService extends AbstractControllerService {
+    static final PropertyDescriptor ENABLE_FAILURE_COUNT = new PropertyDescriptor.Builder()
+        .name("Enable Failure Count")
+        .description("How many times the CS should fail to enable before succeeding")
+        .required(true)
+        .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+        .defaultValue("0")
+        .build();
+
+    static final PropertyDescriptor FAIL_ON_DISABLE = new PropertyDescriptor.Builder()
+        .name("Fail on Disable")
+        .displayName("Fail on Disable")
+        .description("Whether or not hte Controller Service should fail when disabled")
+        .required(true)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .build();
+
+    private final AtomicInteger invocationCount = new AtomicInteger(0);
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return Arrays.asList(ENABLE_FAILURE_COUNT, FAIL_ON_DISABLE);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) {
+        final int maxFailureCount = context.getProperty(ENABLE_FAILURE_COUNT).asInteger();
+        final int currentInvocationCount = invocationCount.getAndIncrement();
+        if (currentInvocationCount >= maxFailureCount) {
+            getLogger().info("Enabling successfully because invocation count is {}", currentInvocationCount);
+            return;
+        }
+
+        getLogger().info("Will fail to enable because invocation count is {}", currentInvocationCount);
+        throw new RuntimeException("Failing to enable because configured to fail " + maxFailureCount + " times and current failure count is only " + currentInvocationCount);
+    }
+
+    @OnDisabled
+    public void onDisabled(final ConfigurationContext context) {
+        if (context.getProperty(FAIL_ON_DISABLE).asBoolean()) {
+            getLogger().info("Throwing Exception in onDisabled as configured");
+            throw new RuntimeException("Failing to disable because configured to fail on disable");
+        }
+
+        getLogger().info("Completing onDisabled successfully as configured");
+    }
+}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 10f6db7935..01e184617a 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -15,6 +15,7 @@
 
 org.apache.nifi.cs.tests.system.EnsureControllerServiceConfigurationCorrect
 org.apache.nifi.cs.tests.system.FakeControllerService1
+org.apache.nifi.cs.tests.system.LifecycleFailureService
 org.apache.nifi.cs.tests.system.SensitiveDynamicPropertiesService
 org.apache.nifi.cs.tests.system.StandardCountService
 org.apache.nifi.cs.tests.system.StandardSleepService
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 15512cc694..38e8a83fce 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -550,6 +550,20 @@ public class NiFiClientUtil {
         return nifiClient.getControllerServicesClient().updateControllerService(entity);
     }
 
+    public ActivateControllerServicesEntity enableControllerServices(final String groupId, final boolean waitForEnabled) throws NiFiClientException, IOException {
+        final ActivateControllerServicesEntity activateControllerServicesEntity = new ActivateControllerServicesEntity();
+        activateControllerServicesEntity.setId(groupId);
+        activateControllerServicesEntity.setState(ActivateControllerServicesEntity.STATE_ENABLED);
+        activateControllerServicesEntity.setDisconnectedNodeAcknowledged(true);
+
+        final ActivateControllerServicesEntity activateControllerServices = nifiClient.getFlowClient().activateControllerServices(activateControllerServicesEntity);
+        if (waitForEnabled) {
+            waitForControllerSerivcesEnabled(groupId);
+        }
+
+        return activateControllerServices;
+    }
+
     public ControllerServiceEntity enableControllerService(final ControllerServiceEntity entity) throws NiFiClientException, IOException {
         final ControllerServiceRunStatusEntity runStatusEntity = new ControllerServiceRunStatusEntity();
         runStatusEntity.setState("ENABLED");
@@ -732,6 +746,10 @@ public class NiFiClientUtil {
         waitForControllerServiceState(groupId, "ENABLED", Arrays.asList(serviceIdsOfInterest));
     }
 
+    public void waitForControllerSerivcesEnabled(final String groupId, final List<String> serviceIdsOfInterest) throws NiFiClientException, IOException {
+        waitForControllerServiceState(groupId, "ENABLED", serviceIdsOfInterest);
+    }
+
     public void waitForControllerServiceState(final String groupId, final String desiredState, final Collection<String> serviceIdsOfInterest) throws NiFiClientException, IOException {
         while (true) {
             final List<ControllerServiceEntity> nonDisabledServices = getControllerServicesNotInState(groupId, desiredState, serviceIdsOfInterest);
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index faf1409b9f..ba8a606a6f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -157,7 +157,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
             final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), i);
             assertEquals("updated", flowFile.getFlowFile().getAttributes().get("attr"));
         }
-
     }
 
     @Test
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java
new file mode 100644
index 0000000000..6295f6ee24
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.controllerservice;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class ControllerServiceLifecycleIT extends NiFiSystemIT {
+    @Test
+    public void testControllerServiceFailingToEnableAllowsOthersToEnable() throws NiFiClientException, IOException {
+        for (int i=0; i < 12; i++) {
+            ControllerServiceEntity failureService = getClientUtil().createControllerService("LifecycleFailureService");
+            getClientUtil().updateControllerServiceProperties(failureService, Collections.singletonMap("Enable Failure Count", "1000"));
+        }
+
+        final List<String> countServiceIds = new ArrayList<>();
+        for (int i=0; i < 12; i++) {
+            ControllerServiceEntity countService = getClientUtil().createControllerService("StandardCountService");
+            countServiceIds.add(countService.getId());
+        }
+
+        getClientUtil().enableControllerServices("root", false);
+        getClientUtil().waitForControllerSerivcesEnabled("root", countServiceIds);
+    }
+
+    @Test
+    public void testControllerServiceEnableFailureCausesRetry() throws NiFiClientException, IOException {
+        ControllerServiceEntity service = getClientUtil().createControllerService("LifecycleFailureService");
+        getClientUtil().updateControllerServiceProperties(service, Collections.singletonMap("Enable Failure Count", "1"));
+
+        getClientUtil().enableControllerServices("root", false);
+        getClientUtil().waitForControllerSerivcesEnabled("root");
+    }
+
+}