You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/02/11 17:12:05 UTC
incubator-nifi git commit: NIFI-250: Move activate/deactive methods
for controller services' referencing components to ControllerServiceProvider
Repository: incubator-nifi
Updated Branches:
refs/heads/NIFI-250 ed4d22c1f -> 371e0100d
NIFI-250: Move activate/deactive methods for controller services' referencing components to ControllerServiceProvider
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/371e0100
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/371e0100
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/371e0100
Branch: refs/heads/NIFI-250
Commit: 371e0100d35864f0092b133a7decac34ffb4be33
Parents: ed4d22c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Feb 11 11:11:49 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Feb 11 11:11:49 2015 -0500
----------------------------------------------------------------------
.../cluster/manager/impl/WebClusterManager.java | 12 +-
.../nifi/controller/ProcessScheduler.java | 13 ++
.../service/ControllerServiceNode.java | 24 ++-
.../service/ControllerServiceProvider.java | 24 +++
.../apache/nifi/controller/FlowController.java | 86 ++---------
.../scheduling/StandardProcessScheduler.java | 4 +
.../service/StandardControllerServiceNode.java | 45 ++++--
.../StandardControllerServiceProvider.java | 146 ++++++++++++++++++-
.../StandardControllerServiceReference.java | 5 +-
.../TestStandardControllerServiceProvider.java | 107 ++++++++++++++
.../nifi/controller/service/mock/ServiceA.java | 49 +++++++
.../nifi/controller/service/mock/ServiceB.java | 23 +++
12 files changed, 437 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index 8a431ad..ee3c621 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -357,7 +357,6 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
this.httpResponseMapper = httpResponseMapper;
this.dataFlowManagementService = dataFlowManagementService;
this.properties = properties;
- this.controllerServiceProvider = new StandardControllerServiceProvider();
this.bulletinRepository = new VolatileBulletinRepository();
this.instanceId = UUID.randomUUID().toString();
this.senderListener = senderListener;
@@ -410,6 +409,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, new QuartzSchedulingAgent(null, reportingTaskEngine, null, encryptor));
processScheduler.setMaxThreadCount(SchedulingStrategy.TIMER_DRIVEN, 10);
processScheduler.setMaxThreadCount(SchedulingStrategy.CRON_DRIVEN, 10);
+
+ controllerServiceProvider = new StandardControllerServiceProvider(processScheduler);
}
public void start() throws IOException {
@@ -1356,6 +1357,15 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
return controllerServiceProvider.getAllControllerServices();
}
+ @Override
+ public void activateReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.activateReferencingComponents(serviceNode);
+ }
+
+ @Override
+ public void deactivateReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.deactivateReferencingComponents(serviceNode);
+ }
private byte[] serialize(final Document doc) throws TransformerException {
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
index 303f540..724d1f2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ProcessScheduler.java
@@ -143,4 +143,17 @@ public interface ProcessScheduler {
* @param procNode
*/
void yield(ProcessorNode procNode);
+
+ /**
+ * Stops scheduling the given Reporting Task to run
+ * @param taskNode
+ */
+ void unschedule(ReportingTaskNode taskNode);
+
+ /**
+ * Begins scheduling the given Reporting Taks to run
+ * @param taskNode
+ */
+ void schedule(ReportingTaskNode taskNode);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index d737d04..4822958 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -16,6 +16,8 @@
*/
package org.apache.nifi.controller.service;
+import java.util.Set;
+
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
@@ -27,7 +29,15 @@ public interface ControllerServiceNode extends ConfiguredComponent {
boolean isDisabled();
- void setDisabled(boolean disabled);
+ void enable();
+ void disable();
+
+ /**
+ * Disables the Controller Service but does not verify that the provided set of referencing
+ * Controller Services should be verified as disabled first
+ * @param ignoredReferences
+ */
+ void disable(Set<ControllerServiceNode> ignoredReferences);
ControllerServiceReference getReferences();
@@ -40,6 +50,18 @@ public interface ControllerServiceNode extends ConfiguredComponent {
void verifyCanEnable();
void verifyCanDisable();
+
+ /**
+ * Verifies that this Controller Service can be disabled if the provided set of
+ * services are also disabled. This is introduced because we can have an instance
+ * where A references B, which references C, which references A and we want
+ * to disable service C. In this case, the cycle needs to not cause us to fail,
+ * so we want to verify that C can be disabled if A and B also are.
+ *
+ * @param ignoredReferences
+ */
+ void verifyCanDisable(Set<ControllerServiceNode> ignoredReferences);
+
void verifyCanDelete();
void verifyCanUpdate();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 4074e89..7a767bf 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -74,4 +74,28 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
* @return
*/
Set<ControllerServiceNode> getAllControllerServices();
+
+ /**
+ * Recursively stops all Processors and Reporting Tasks that are referencing the given Controller Service,
+ * as well as disabling any Controller Service that references this Controller Service (and stops
+ * all Reporting Task or Controller Service that is referencing it, and so on).
+ * @param serviceNode
+ */
+ void deactivateReferencingComponents(ControllerServiceNode serviceNode);
+
+ /**
+ * <p>
+ * Starts any enabled Processors and Reporting Tasks that are referencing this Controller Service. If other Controller
+ * Services reference this Controller Service, will also enable those services and 'activate' any components referencing
+ * them.
+ * </p>
+ *
+ * <p>
+ * NOTE: If any component cannot be started, an IllegalStateException will be thrown an no more components will
+ * be activated. This method provides no atomicity.
+ * </p>
+ *
+ * @param serviceNode
+ */
+ void activateReferencingComponents(ControllerServiceNode serviceNode);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 8bd1149..2825d5b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -104,7 +104,6 @@ import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
-import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
@@ -376,7 +375,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
this.properties = properties;
sslContext = SslContextFactory.createSslContext(properties, false);
extensionManager = new ExtensionManager();
- controllerServiceProvider = new StandardControllerServiceProvider();
timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
@@ -400,6 +398,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
processScheduler = new StandardProcessScheduler(this, this, encryptor);
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
+ controllerServiceProvider = new StandardControllerServiceProvider(processScheduler);
final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
@@ -2571,82 +2570,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
return reportingTasks.values();
}
- /**
- * Recursively stops all Processors and Reporting Tasks that are referencing the given Controller Service,
- * as well as disabling any Controller Service that references this Controller Service (and stops
- * all Reporting Task or Controller Service that is referencing it, and so on).
- * @param serviceNode
- */
- public void deactiveReferencingComponents(final ControllerServiceNode serviceNode) {
- final ControllerServiceReference reference = serviceNode.getReferences();
-
- final Set<ConfiguredComponent> components = reference.getActiveReferences();
- for (final ConfiguredComponent component : components) {
- if ( component instanceof ControllerServiceNode ) {
- deactiveReferencingComponents((ControllerServiceNode) component);
-
- if (isControllerServiceEnabled(serviceNode.getIdentifier())) {
- disableControllerService(serviceNode);
- }
- } else if ( component instanceof ReportingTaskNode ) {
- final ReportingTaskNode taskNode = (ReportingTaskNode) component;
- if (taskNode.isRunning()) {
- stopReportingTask((ReportingTaskNode) component);
- }
- } else if ( component instanceof ProcessorNode ) {
- final ProcessorNode procNode = (ProcessorNode) component;
- if ( procNode.isRunning() ) {
- stopProcessor(procNode.getProcessGroup().getIdentifier(), procNode.getIdentifier());
- }
- }
- }
- }
-
- /**
- * <p>
- * Starts any enabled Processors and Reporting Tasks that are referencing this Controller Service. If other Controller
- * Services reference this Controller Service, will also enable those services and 'active' any components referencing
- * them.
- * </p>
- *
- * <p>
- * NOTE: If any component cannot be started, an IllegalStateException will be thrown an no more components will
- * be activated. This method provides no atomicity.
- * </p>
- *
- * @param serviceNode
- */
+ @Override
public void activateReferencingComponents(final ControllerServiceNode serviceNode) {
- final ControllerServiceReference ref = serviceNode.getReferences();
- final Set<ConfiguredComponent> components = ref.getReferencingComponents();
-
- // First, activate any other controller services. We do this first so that we can
- // avoid the situation where Processor X depends on Controller Services Y and Z; and
- // Controller Service Y depends on Controller Service Z. In this case, if we first attempted
- // to start Processor X, we would fail because Controller Service Y is disabled. THis way, we
- // can recursively enable everything.
- for ( final ConfiguredComponent component : components ) {
- if (component instanceof ControllerServiceNode) {
- final ControllerServiceNode componentNode = (ControllerServiceNode) component;
- enableControllerService(componentNode);
- activateReferencingComponents(componentNode);
- }
- }
-
- for ( final ConfiguredComponent component : components ) {
- if (component instanceof ProcessorNode) {
- final ProcessorNode procNode = (ProcessorNode) component;
- if ( !procNode.isRunning() ) {
- startProcessor(procNode.getProcessGroup().getIdentifier(), procNode.getIdentifier());
- }
- } else if (component instanceof ReportingTaskNode) {
- final ReportingTaskNode taskNode = (ReportingTaskNode) component;
- if ( !taskNode.isRunning() ) {
- startReportingTask(taskNode);
- }
- }
- }
+ controllerServiceProvider.activateReferencingComponents(serviceNode);
}
@Override
@@ -2664,6 +2591,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
processScheduler.disableReportingTask(reportingTaskNode);
}
+
+ @Override
+ public void deactivateReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.deactivateReferencingComponents(serviceNode);
+ }
+
@Override
public void enableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanEnable();
@@ -2675,6 +2608,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
serviceNode.verifyCanDisable();
controllerServiceProvider.disableControllerService(serviceNode);
}
+
@Override
public ControllerService getControllerService(final String serviceIdentifier) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 1627994..b6699d2 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -145,6 +145,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
componentLifeCycleThreadPool.shutdown();
}
+
+ @Override
public void schedule(final ReportingTaskNode taskNode) {
final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
if (scheduleState.isScheduled()) {
@@ -203,6 +205,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
componentLifeCycleThreadPool.execute(startReportingTaskRunnable);
}
+
+ @Override
public void unschedule(final ReportingTaskNode taskNode) {
final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
if (!scheduleState.isScheduled()) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index ff4704d..b29f86b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,6 +16,7 @@
*/
package org.apache.nifi.controller.service;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -60,22 +61,27 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
public boolean isDisabled() {
return disabled.get();
}
-
+
+
@Override
- public void setDisabled(final boolean disabled) {
- if (!disabled && !isValid()) {
+ public void enable() {
+ if ( !isValid() ) {
throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid");
}
-
- if (disabled) {
- // do not allow a Controller Service to be disabled if it's currently being used.
- final Set<ConfiguredComponent> runningRefs = getReferences().getActiveReferences();
- if (!runningRefs.isEmpty()) {
- throw new IllegalStateException("Cannot disable Controller Service because it is referenced (either directly or indirectly) by " + runningRefs.size() + " different components that are currently running");
- }
- }
-
- this.disabled.set(disabled);
+
+ this.disabled.set(false);
+ }
+
+ @Override
+ public void disable() {
+ verifyCanDisable();
+ this.disabled.set(true);
+ }
+
+ @Override
+ public void disable(final Set<ControllerServiceNode> ignoredReferences) {
+ verifyCanDisable(ignoredReferences);
+ this.disabled.set(true);
}
@Override
@@ -161,10 +167,17 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void verifyCanDisable() {
+ verifyCanDisable(Collections.<ControllerServiceNode>emptySet());
+ }
+
+ @Override
+ public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
final ControllerServiceReference references = getReferences();
- final int numRunning = references.getActiveReferences().size();
- if ( numRunning > 0 ) {
- throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by " + numRunning + " components that are currently running");
+
+ for ( final ConfiguredComponent activeReference : references.getActiveReferences() ) {
+ if ( !ignoreReferences.contains(activeReference) ) {
+ throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by at least one component that is currently running");
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 208da30..c584188 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -37,7 +37,11 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
@@ -56,6 +60,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
+ private final ProcessScheduler processScheduler;
private final ConcurrentMap<String, ControllerServiceNode> controllerServices;
private static final Set<Method> validDisabledMethods;
@@ -71,10 +76,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
validDisabledMethods = Collections.unmodifiableSet(validMethods);
}
- public StandardControllerServiceProvider() {
+ public StandardControllerServiceProvider(final ProcessScheduler scheduler) {
// the following 2 maps must be updated atomically, but we do not lock around them because they are modified
// only in the createControllerService method, and both are modified before the method returns
this.controllerServices = new ConcurrentHashMap<>();
+ this.processScheduler = scheduler;
}
private Class<?>[] getInterfaces(final Class<?> cls) {
@@ -106,8 +112,14 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
final ClassLoader cl = ExtensionManager.getClassLoader(type);
- Thread.currentThread().setContextClassLoader(cl);
- final Class<?> rawClass = Class.forName(type, false, cl);
+ final Class<?> rawClass;
+ if ( cl == null ) {
+ rawClass = Class.forName(type);
+ } else {
+ Thread.currentThread().setContextClassLoader(cl);
+ rawClass = Class.forName(type, false, cl);
+ }
+
final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
final ControllerService originalService = controllerServiceClass.newInstance();
@@ -135,7 +147,12 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
};
- final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
+ final ControllerService proxiedService;
+ if ( cl == null ) {
+ proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), getInterfaces(controllerServiceClass), invocationHandler);
+ } else {
+ proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
+ }
logger.info("Create Controller Service of type {} with identifier {}", type, id);
originalService.initialize(new StandardControllerServiceInitializationContext(id, this));
@@ -174,7 +191,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation(), configContext);
}
- serviceNode.setDisabled(false);
+ serviceNode.enable();
}
@Override
@@ -183,7 +200,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
// We must set the service to disabled before we invoke the OnDisabled methods because the service node
// can throw Exceptions if we attempt to disable the service while it's known to be in use.
- serviceNode.setDisabled(true);
+ serviceNode.disable();
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation());
@@ -263,4 +280,121 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public Set<ControllerServiceNode> getAllControllerServices() {
return new HashSet<>(controllerServices.values());
}
+
+ @Override
+ public void deactivateReferencingComponents(final ControllerServiceNode serviceNode) {
+ deactivateReferencingComponents(serviceNode, new HashSet<ControllerServiceNode>());
+ }
+
+ private void deactivateReferencingComponents(final ControllerServiceNode serviceNode, final Set<ControllerServiceNode> visited) {
+ final ControllerServiceReference reference = serviceNode.getReferences();
+
+ final Set<ConfiguredComponent> components = reference.getActiveReferences();
+ for (final ConfiguredComponent component : components) {
+ if ( component instanceof ControllerServiceNode ) {
+ // If we've already visited this component (there is a loop such that
+ // we are disabling Controller Service A, but B depends on A and A depends on B)
+ // we don't need to disable this component because it will be disabled after we return
+ if ( visited.contains(component) ) {
+ continue;
+ }
+
+ visited.add(serviceNode);
+ deactivateReferencingComponents((ControllerServiceNode) component, visited);
+
+ if (isControllerServiceEnabled(serviceNode.getIdentifier())) {
+ serviceNode.verifyCanDisable(visited);
+ serviceNode.disable(visited);
+ }
+ } else if ( component instanceof ReportingTaskNode ) {
+ final ReportingTaskNode taskNode = (ReportingTaskNode) component;
+ if (taskNode.isRunning()) {
+ taskNode.verifyCanStop();
+ processScheduler.unschedule(taskNode);
+ }
+ } else if ( component instanceof ProcessorNode ) {
+ final ProcessorNode procNode = (ProcessorNode) component;
+ if ( procNode.isRunning() ) {
+ procNode.getProcessGroup().stopProcessor(procNode);
+ }
+ }
+ }
+ }
+
+
+ @Override
+ public void activateReferencingComponents(final ControllerServiceNode serviceNode) {
+ activateReferencingComponents(serviceNode, new HashSet<ControllerServiceNode>());
+ }
+
+
+ /**
+ * Recursively enables this controller service and any controller service that it references.
+ * @param serviceNode
+ */
+ private void activateReferencedComponents(final ControllerServiceNode serviceNode) {
+ for ( final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet() ) {
+ final PropertyDescriptor key = entry.getKey();
+ if ( key.getControllerServiceDefinition() == null ) {
+ continue;
+ }
+
+ final String serviceId = entry.getValue() == null ? key.getDefaultValue() : entry.getValue();
+ if ( serviceId == null ) {
+ continue;
+ }
+
+ final ControllerServiceNode referencedNode = getControllerServiceNode(serviceId);
+ if ( referencedNode == null ) {
+ throw new IllegalStateException("Cannot activate referenced component of " + serviceNode + " because no service exists with ID " + serviceId);
+ }
+
+ activateReferencedComponents(referencedNode);
+
+ if ( referencedNode.isDisabled() ) {
+ enableControllerService(referencedNode);
+ }
+ }
+ }
+
+ private void activateReferencingComponents(final ControllerServiceNode serviceNode, final Set<ControllerServiceNode> visited) {
+ if ( serviceNode.isDisabled() ) {
+ throw new IllegalStateException("Cannot activate referencing components of " + serviceNode.getControllerServiceImplementation() + " because the Controller Service is disabled");
+ }
+
+ final ControllerServiceReference ref = serviceNode.getReferences();
+ final Set<ConfiguredComponent> components = ref.getReferencingComponents();
+
+ // First, activate any other controller services. We do this first so that we can
+ // avoid the situation where Processor X depends on Controller Services Y and Z; and
+ // Controller Service Y depends on Controller Service Z. In this case, if we first attempted
+ // to start Processor X, we would fail because Controller Service Y is disabled. THis way, we
+ // can recursively enable everything.
+ for ( final ConfiguredComponent component : components ) {
+ if (component instanceof ControllerServiceNode) {
+ final ControllerServiceNode componentNode = (ControllerServiceNode) component;
+ activateReferencedComponents(componentNode);
+
+ if ( componentNode.isDisabled() ) {
+ enableControllerService(componentNode);
+ }
+
+ activateReferencingComponents(componentNode);
+ }
+ }
+
+ for ( final ConfiguredComponent component : components ) {
+ if (component instanceof ProcessorNode) {
+ final ProcessorNode procNode = (ProcessorNode) component;
+ if ( !procNode.isRunning() ) {
+ procNode.getProcessGroup().startProcessor(procNode);
+ }
+ } else if (component instanceof ReportingTaskNode) {
+ final ReportingTaskNode taskNode = (ReportingTaskNode) component;
+ if ( !taskNode.isRunning() ) {
+ processScheduler.schedule(taskNode);
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
index a8468ff..97921d6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
@@ -65,7 +65,10 @@ public class StandardControllerServiceReference implements ControllerServiceRefe
for (final ConfiguredComponent component : components) {
if (component instanceof ControllerServiceNode) {
serviceNodes.add((ControllerServiceNode) component);
- activeReferences.add(component);
+
+ if ( !((ControllerServiceNode) component).isDisabled() ) {
+ activeReferences.add(component);
+ }
} else if (isRunning(component)) {
activeReferences.add(component);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
new file mode 100644
index 0000000..fbd3dd7
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/TestStandardControllerServiceProvider.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+import static org.junit.Assert.assertFalse;
+
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.service.mock.ServiceA;
+import org.apache.nifi.controller.service.mock.ServiceB;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestStandardControllerServiceProvider {
+
+ @Test
+ public void testDisableControllerService() {
+ final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+ final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler);
+
+ final ControllerServiceNode serviceNode = provider.createControllerService(ServiceB.class.getName(), "B", false);
+ provider.enableControllerService(serviceNode);
+ provider.disableControllerService(serviceNode);
+ }
+
+ @Test
+ public void testEnableDisableWithReference() {
+ final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+ final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler);
+
+ final ControllerServiceNode serviceNodeB = provider.createControllerService(ServiceB.class.getName(), "B", false);
+ final ControllerServiceNode serviceNodeA = provider.createControllerService(ServiceA.class.getName(), "A", false);
+
+ serviceNodeA.setProperty(ServiceA.OTHER_SERVICE.getName(), "B");
+
+ try {
+ provider.enableControllerService(serviceNodeA);
+ Assert.fail("Was able to enable Service A but Service B is disabled.");
+ } catch (final IllegalStateException expected) {
+ }
+
+ provider.enableControllerService(serviceNodeB);
+ provider.enableControllerService(serviceNodeA);
+
+ try {
+ provider.disableControllerService(serviceNodeB);
+ Assert.fail("Was able to disable Service B but Service A is enabled and references B");
+ } catch (final IllegalStateException expected) {
+ }
+
+ provider.disableControllerService(serviceNodeA);
+ provider.disableControllerService(serviceNodeB);
+ }
+
+
+ @Test
+ public void testActivateReferencingComponentsGraph() {
+ final ProcessScheduler scheduler = Mockito.mock(ProcessScheduler.class);
+ final StandardControllerServiceProvider provider = new StandardControllerServiceProvider(scheduler);
+
+ // build a graph of components with dependencies as such:
+ //
+ // A -> B -> D
+ // C ---^----^
+ //
+ // In other words, A references B, which references D.
+ // AND
+ // C references B and D.
+ //
+ // So we have to verify that if D is enabled, when we enable its referencing components,
+ // we enable C and B, even if we attempt to enable C before B... i.e., if we try to enable C, we cannot do so
+ // until B is first enabled so ensure that we enable B first.
+
+ final ControllerServiceNode serviceNode1 = provider.createControllerService(ServiceA.class.getName(), "1", false);
+ final ControllerServiceNode serviceNode2 = provider.createControllerService(ServiceA.class.getName(), "2", false);
+ final ControllerServiceNode serviceNode3 = provider.createControllerService(ServiceA.class.getName(), "3", false);
+ final ControllerServiceNode serviceNode4 = provider.createControllerService(ServiceB.class.getName(), "4", false);
+
+ serviceNode1.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+ serviceNode2.setProperty(ServiceA.OTHER_SERVICE.getName(), "4");
+ serviceNode3.setProperty(ServiceA.OTHER_SERVICE.getName(), "2");
+ serviceNode3.setProperty(ServiceA.OTHER_SERVICE_2.getName(), "4");
+
+ provider.enableControllerService(serviceNode4);
+ provider.activateReferencingComponents(serviceNode4);
+
+ assertFalse(serviceNode3.isDisabled());
+ assertFalse(serviceNode2.isDisabled());
+ assertFalse(serviceNode1.isDisabled());
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
new file mode 100644
index 0000000..4918468
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceA.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service.mock;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ControllerService;
+
+public class ServiceA extends AbstractControllerService {
+
+ public static final PropertyDescriptor OTHER_SERVICE = new PropertyDescriptor.Builder()
+ .name("Other Service")
+ .identifiesControllerService(ControllerService.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor OTHER_SERVICE_2 = new PropertyDescriptor.Builder()
+ .name("Other Service 2")
+ .identifiesControllerService(ControllerService.class)
+ .required(false)
+ .build();
+
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
+ descriptors.add(OTHER_SERVICE);
+ descriptors.add(OTHER_SERVICE_2);
+ return descriptors;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/371e0100/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceB.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceB.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceB.java
new file mode 100644
index 0000000..070b156
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/ServiceB.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service.mock;
+
+import org.apache.nifi.controller.AbstractControllerService;
+
+public class ServiceB extends AbstractControllerService {
+
+}