You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/07/25 15:04:16 UTC
[nifi] branch main updated: NIFI-10262: Ensure that when an Exception is thrown from a Controller Service's @OnEnabled method that we properly handle that Exception and continue enabling the other services in the given collection of services
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0497907829 NIFI-10262: Ensure that when an Exception is thrown from a Controller Service's @OnEnabled method that we properly handle that Exception and continue enabling the other services in the given collection of services
0497907829 is described below
commit 0497907829ca25523622c021073b90c1aa9a3094
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Fri Jul 22 16:13:15 2022 -0400
NIFI-10262: Ensure that when an Exception is thrown from a Controller Service's @OnEnabled method that we properly handle that Exception and continue enabling the other services in the given collection of services
This closes #6236
Signed-off-by: David Handermann <ex...@apache.org>
---
.../service/StandardControllerServiceNode.java | 8 +-
.../service/StandardControllerServiceProvider.java | 126 +++++++++++++--------
.../cs/tests/system/LifecycleFailureService.java | 78 +++++++++++++
.../org.apache.nifi.controller.ControllerService | 1 +
.../apache/nifi/tests/system/NiFiClientUtil.java | 18 +++
.../system/clustering/FlowSynchronizationIT.java | 1 -
.../ControllerServiceLifecycleIT.java | 57 ++++++++++
7 files changed, 236 insertions(+), 53 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 0da7c98884..02189a1579 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -324,7 +324,7 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
@Override
public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
if (!this.isActive()) {
- throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation().getIdentifier() + " because it is not enabled");
+ return;
}
final ControllerServiceReference references = getReferences();
@@ -608,16 +608,12 @@ public class StandardControllerServiceNode extends AbstractComponentNode impleme
final Throwable cause = e instanceof InvocationTargetException ? e.getCause() : e;
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
- componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
- LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
+ componentLog.error("Failed to invoke @OnEnabled method", cause);
invokeDisable(configContext);
if (isActive()) {
scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
} else {
- try (final NarCloseable nc = NarCloseable.withComponentNarLoader(getExtensionManager(), getControllerServiceImplementation().getClass(), getIdentifier())) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, getControllerServiceImplementation(), configContext);
- }
stateTransition.disable();
}
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 414d629f0d..735414d376 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -48,9 +48,11 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
@@ -224,12 +226,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
if (shouldStart) {
for (ControllerServiceNode controllerServiceNode : serviceNodes) {
try {
- if (!controllerServiceNode.isActive()) {
- final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
+ final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
- future.get(30, TimeUnit.SECONDS);
- logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
- }
+ future.get(30, TimeUnit.SECONDS);
+ logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
} catch (final ControllerServiceNotValidException csnve) {
logger.warn("Failed to enable service {} because it is not currently valid", controllerServiceNode);
} catch (Exception e) {
@@ -247,14 +247,20 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public Future<Void> enableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
final CompletableFuture<Void> future = new CompletableFuture<>();
processScheduler.submitFrameworkTask(() -> {
- enableControllerServices(serviceNodes, future);
- future.complete(null);
+ try {
+ enableControllerServices(serviceNodes, future);
+ future.complete(null);
+ } catch (final Exception e) {
+ future.completeExceptionally(e);
+ }
});
return future;
}
- private void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> completableFuture) {
+ private void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> completableFuture) throws Exception {
+ Exception firstFailure = null;
+
// validate that we are able to start all of the services.
for (final ControllerServiceNode controllerServiceNode : serviceNodes) {
if (completableFuture.isCancelled()) {
@@ -262,29 +268,37 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
try {
- if (!controllerServiceNode.isActive()) {
- final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
+ // If service is already active, just move on to the next
+ if (controllerServiceNode.isActive()) {
+ continue;
+ }
- while (true) {
- try {
- future.get(1, TimeUnit.SECONDS);
- logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
- break;
- } catch (final TimeoutException e) {
- if (completableFuture.isCancelled()) {
- return;
- }
- } catch (final Exception e) {
- logger.warn("Failed to enable service {}", controllerServiceNode, e);
- completableFuture.completeExceptionally(e);
-
- if (this.bulletinRepo != null) {
- this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
- Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e));
- }
+ final Future<Void> future = enableControllerServiceAndDependencies(controllerServiceNode);
+ // Wait for the future to complete. But if the completableFuture ever is canceled, we want to stop waiting and return.
+ while (true) {
+ try {
+ future.get(1, TimeUnit.SECONDS);
+ logger.debug("Successfully enabled {}; service state = {}", controllerServiceNode, controllerServiceNode.getState());
+ break;
+ } catch (final TimeoutException e) {
+ if (completableFuture.isCancelled()) {
return;
}
+ } catch (final Exception e) {
+ logger.warn("Failed to enable service {}", controllerServiceNode, e);
+ if (firstFailure == null) {
+ firstFailure = e;
+ } else {
+ firstFailure.addSuppressed(e);
+ }
+
+ if (this.bulletinRepo != null) {
+ this.bulletinRepo.addBulletin(BulletinFactory.createBulletin("Controller Service",
+ Severity.ERROR.name(), "Could not enable " + controllerServiceNode + " due to " + e));
+ }
+
+ break;
}
}
} catch (Exception e) {
@@ -295,6 +309,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
}
+
+ if (firstFailure != null) {
+ throw firstFailure;
+ }
}
@Override
@@ -382,14 +400,18 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public Future<Void> disableControllerServicesAsync(final Collection<ControllerServiceNode> serviceNodes) {
final CompletableFuture<Void> future = new CompletableFuture<>();
processScheduler.submitFrameworkTask(() -> {
- disableControllerServices(serviceNodes, future);
- future.complete(null);
+ try {
+ disableControllerServices(serviceNodes, future);
+ future.complete(null);
+ } catch (final Exception e) {
+ future.completeExceptionally(e);
+ }
});
return future;
}
- private void disableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> future) {
+ private void disableControllerServices(final Collection<ControllerServiceNode> serviceNodes, final CompletableFuture<Void> future) throws Exception {
final Set<ControllerServiceNode> serviceNodeSet = new HashSet<>(serviceNodes);
// Verify that for each Controller Service given, any service that references it is either disabled or is also in the given collection
@@ -406,24 +428,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
+ Exception firstFailure = null;
for (final ControllerServiceNode serviceNode : serviceNodes) {
if (serviceNode.isActive()) {
- disableReferencingServices(serviceNode);
- final CompletableFuture<?> serviceFuture = disableControllerService(serviceNode);
-
- while (true) {
- try {
- serviceFuture.get(1, TimeUnit.SECONDS);
- break;
- } catch (final TimeoutException e) {
- if (future.isCancelled()) {
- return;
- }
-
- continue;
- } catch (final Exception e) {
- logger.error("Failed to disable {}", serviceNode, e);
- future.completeExceptionally(e);
+ try {
+ disableControllerServiceAndReferencingServices(serviceNode, future::isCancelled);
+ } catch (final Exception e) {
+ if (firstFailure == null) {
+ firstFailure = e;
+ } else {
+ firstFailure.addSuppressed(e);
}
}
} else {
@@ -438,6 +452,26 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
}
+
+ if (firstFailure != null) {
+ throw firstFailure;
+ }
+ }
+
+ private void disableControllerServiceAndReferencingServices(final ControllerServiceNode serviceNode, final BooleanSupplier cancelSupplier) throws ExecutionException, InterruptedException {
+ disableReferencingServices(serviceNode);
+ final CompletableFuture<?> serviceFuture = disableControllerService(serviceNode);
+
+ while (true) {
+ try {
+ serviceFuture.get(1, TimeUnit.SECONDS);
+ break;
+ } catch (final TimeoutException e) {
+ if (cancelSupplier.getAsBoolean()) {
+ return;
+ }
+ }
+ }
}
@Override
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/LifecycleFailureService.java b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/LifecycleFailureService.java
new file mode 100644
index 0000000000..59df1c404f
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/java/org/apache/nifi/cs/tests/system/LifecycleFailureService.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.cs.tests.system;
+
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class LifecycleFailureService extends AbstractControllerService {
+ static final PropertyDescriptor ENABLE_FAILURE_COUNT = new PropertyDescriptor.Builder()
+ .name("Enable Failure Count")
+ .description("How many times the CS should fail to enable before succeeding")
+ .required(true)
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
+ .defaultValue("0")
+ .build();
+
+ static final PropertyDescriptor FAIL_ON_DISABLE = new PropertyDescriptor.Builder()
+ .name("Fail on Disable")
+ .displayName("Fail on Disable")
+ .description("Whether or not hte Controller Service should fail when disabled")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .build();
+
+ private final AtomicInteger invocationCount = new AtomicInteger(0);
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return Arrays.asList(ENABLE_FAILURE_COUNT, FAIL_ON_DISABLE);
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) {
+ final int maxFailureCount = context.getProperty(ENABLE_FAILURE_COUNT).asInteger();
+ final int currentInvocationCount = invocationCount.getAndIncrement();
+ if (currentInvocationCount >= maxFailureCount) {
+ getLogger().info("Enabling successfully because invocation count is {}", currentInvocationCount);
+ return;
+ }
+
+ getLogger().info("Will fail to enable because invocation count is {}", currentInvocationCount);
+ throw new RuntimeException("Failing to enable because configured to fail " + maxFailureCount + " times and current failure count is only " + currentInvocationCount);
+ }
+
+ @OnDisabled
+ public void onDisabled(final ConfigurationContext context) {
+ if (context.getProperty(FAIL_ON_DISABLE).asBoolean()) {
+ getLogger().info("Throwing Exception in onDisabled as configured");
+ throw new RuntimeException("Failing to disable because configured to fail on disable");
+ }
+
+ getLogger().info("Completing onDisabled successfully as configured");
+ }
+}
diff --git a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 10f6db7935..01e184617a 100644
--- a/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-system-tests/nifi-system-test-extensions-bundle/nifi-system-test-extensions/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -15,6 +15,7 @@
org.apache.nifi.cs.tests.system.EnsureControllerServiceConfigurationCorrect
org.apache.nifi.cs.tests.system.FakeControllerService1
+org.apache.nifi.cs.tests.system.LifecycleFailureService
org.apache.nifi.cs.tests.system.SensitiveDynamicPropertiesService
org.apache.nifi.cs.tests.system.StandardCountService
org.apache.nifi.cs.tests.system.StandardSleepService
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
index 15512cc694..38e8a83fce 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/NiFiClientUtil.java
@@ -550,6 +550,20 @@ public class NiFiClientUtil {
return nifiClient.getControllerServicesClient().updateControllerService(entity);
}
+ public ActivateControllerServicesEntity enableControllerServices(final String groupId, final boolean waitForEnabled) throws NiFiClientException, IOException {
+ final ActivateControllerServicesEntity activateControllerServicesEntity = new ActivateControllerServicesEntity();
+ activateControllerServicesEntity.setId(groupId);
+ activateControllerServicesEntity.setState(ActivateControllerServicesEntity.STATE_ENABLED);
+ activateControllerServicesEntity.setDisconnectedNodeAcknowledged(true);
+
+ final ActivateControllerServicesEntity activateControllerServices = nifiClient.getFlowClient().activateControllerServices(activateControllerServicesEntity);
+ if (waitForEnabled) {
+ waitForControllerSerivcesEnabled(groupId);
+ }
+
+ return activateControllerServices;
+ }
+
public ControllerServiceEntity enableControllerService(final ControllerServiceEntity entity) throws NiFiClientException, IOException {
final ControllerServiceRunStatusEntity runStatusEntity = new ControllerServiceRunStatusEntity();
runStatusEntity.setState("ENABLED");
@@ -732,6 +746,10 @@ public class NiFiClientUtil {
waitForControllerServiceState(groupId, "ENABLED", Arrays.asList(serviceIdsOfInterest));
}
+ public void waitForControllerSerivcesEnabled(final String groupId, final List<String> serviceIdsOfInterest) throws NiFiClientException, IOException {
+ waitForControllerServiceState(groupId, "ENABLED", serviceIdsOfInterest);
+ }
+
public void waitForControllerServiceState(final String groupId, final String desiredState, final Collection<String> serviceIdsOfInterest) throws NiFiClientException, IOException {
while (true) {
final List<ControllerServiceEntity> nonDisabledServices = getControllerServicesNotInState(groupId, desiredState, serviceIdsOfInterest);
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
index faf1409b9f..ba8a606a6f 100644
--- a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/clustering/FlowSynchronizationIT.java
@@ -157,7 +157,6 @@ public class FlowSynchronizationIT extends NiFiSystemIT {
final FlowFileEntity flowFile = getClientUtil().getQueueFlowFile(connection.getId(), i);
assertEquals("updated", flowFile.getFlowFile().getAttributes().get("attr"));
}
-
}
@Test
diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java
new file mode 100644
index 0000000000..6295f6ee24
--- /dev/null
+++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/controllerservice/ControllerServiceLifecycleIT.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.controllerservice;
+
+import org.apache.nifi.tests.system.NiFiSystemIT;
+import org.apache.nifi.toolkit.cli.impl.client.nifi.NiFiClientException;
+import org.apache.nifi.web.api.entity.ControllerServiceEntity;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class ControllerServiceLifecycleIT extends NiFiSystemIT {
+ @Test
+ public void testControllerServiceFailingToEnableAllowsOthersToEnable() throws NiFiClientException, IOException {
+ for (int i=0; i < 12; i++) {
+ ControllerServiceEntity failureService = getClientUtil().createControllerService("LifecycleFailureService");
+ getClientUtil().updateControllerServiceProperties(failureService, Collections.singletonMap("Enable Failure Count", "1000"));
+ }
+
+ final List<String> countServiceIds = new ArrayList<>();
+ for (int i=0; i < 12; i++) {
+ ControllerServiceEntity countService = getClientUtil().createControllerService("StandardCountService");
+ countServiceIds.add(countService.getId());
+ }
+
+ getClientUtil().enableControllerServices("root", false);
+ getClientUtil().waitForControllerSerivcesEnabled("root", countServiceIds);
+ }
+
+ @Test
+ public void testControllerServiceEnableFailureCausesRetry() throws NiFiClientException, IOException {
+ ControllerServiceEntity service = getClientUtil().createControllerService("LifecycleFailureService");
+ getClientUtil().updateControllerServiceProperties(service, Collections.singletonMap("Enable Failure Count", "1"));
+
+ getClientUtil().enableControllerServices("root", false);
+ getClientUtil().waitForControllerSerivcesEnabled("root");
+ }
+
+}