You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/10 15:43:54 UTC
[19/62] [abbrv] incubator-nifi git commit: Squashed commit of the
following:
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java
index aa095d1..30d4365 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/TemplateManager.java
@@ -42,24 +42,25 @@ import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.persistence.TemplateDeserializer;
+import org.apache.nifi.persistence.TemplateSerializer;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.ByteArrayOutputStream;
import org.apache.nifi.stream.io.DataOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
-import org.apache.nifi.persistence.TemplateDeserializer;
-import org.apache.nifi.persistence.TemplateSerializer;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
-import org.apache.nifi.web.api.dto.ProcessorConfigDTO.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
+import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupContentsDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupPortDTO;
import org.apache.nifi.web.api.dto.TemplateDTO;
-import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -272,6 +273,11 @@ public class TemplateManager {
if (snippet.getProcessGroups() != null) {
scrubProcessGroups(snippet.getProcessGroups());
}
+
+ // go through each controller service if specified
+ if (snippet.getControllerServices() != null) {
+ scrubControllerServices(snippet.getControllerServices());
+ }
}
}
@@ -315,7 +321,6 @@ public class TemplateManager {
}
}
- processorConfig.setDescriptors(null);
processorConfig.setCustomUiUrl(null);
}
@@ -323,6 +328,24 @@ public class TemplateManager {
processorDTO.setValidationErrors(null);
}
}
+
+ private void scrubControllerServices(final Set<ControllerServiceDTO> controllerServices) {
+ for ( final ControllerServiceDTO serviceDTO : controllerServices ) {
+ final Map<String, String> properties = serviceDTO.getProperties();
+ final Map<String, PropertyDescriptorDTO> descriptors = serviceDTO.getDescriptors();
+
+ if ( properties != null && descriptors != null ) {
+ for ( final PropertyDescriptorDTO descriptor : descriptors.values() ) {
+ if ( descriptor.isSensitive() ) {
+ properties.put(descriptor.getName(), null);
+ }
+ }
+ }
+
+ serviceDTO.setCustomUiUrl(null);
+ serviceDTO.setValidationErrors(null);
+ }
+ }
/**
* Scrubs connections prior to saving. This includes removing available
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 7c3734a..05b3a06 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -16,11 +16,14 @@
*/
package org.apache.nifi.controller.reporting;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractConfiguredComponent;
-import org.apache.nifi.controller.Availability;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessScheduler;
@@ -29,6 +32,7 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.nar.NarCloseable;
@@ -45,8 +49,8 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
private final AtomicReference<SchedulingStrategy> schedulingStrategy = new AtomicReference<>(SchedulingStrategy.TIMER_DRIVEN);
private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins");
- private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
-
+
+ private volatile String comment;
private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
@@ -59,16 +63,6 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
}
@Override
- public Availability getAvailability() {
- return availability.get();
- }
-
- @Override
- public void setAvailability(final Availability availability) {
- this.availability.set(availability);
- }
-
- @Override
public void setSchedulingStrategy(final SchedulingStrategy schedulingStrategy) {
this.schedulingStrategy.set(schedulingStrategy);
}
@@ -102,6 +96,11 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
public boolean isRunning() {
return processScheduler.isScheduled(this) || processScheduler.getActiveThreadCount(this) > 0;
}
+
+ @Override
+ public int getActiveThreadCount() {
+ return processScheduler.getActiveThreadCount(this);
+ }
@Override
public ConfigurationContext getConfigurationContext() {
@@ -142,7 +141,8 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
return removed;
}
- private void onConfigured() {
+ @SuppressWarnings("deprecation")
+ private void onConfigured() {
// We need to invoke any method annotation with the OnConfigured annotation in order to
// maintain backward compatibility. This will be removed when we remove the old, deprecated annotations.
try (final NarCloseable x = NarCloseable.withNarLoader()) {
@@ -158,6 +158,16 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
}
@Override
+ public String getComments() {
+ return comment;
+ }
+
+ @Override
+ public void setComments(final String comment) {
+ this.comment = comment;
+ }
+
+ @Override
public void verifyCanDelete() {
if (isRunning()) {
throw new IllegalStateException("Cannot delete " + reportingTask + " because it is currently running");
@@ -207,4 +217,38 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
throw new IllegalStateException("Cannot update " + reportingTask + " because it is currently running");
}
}
+
+ @Override
+ public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
+ switch (getScheduledState()) {
+ case DISABLED:
+ throw new IllegalStateException(this + " cannot be started because it is disabled");
+ case RUNNING:
+ throw new IllegalStateException(this + " cannot be started because it is already running");
+ case STOPPED:
+ break;
+ }
+ final int activeThreadCount = getActiveThreadCount();
+ if ( activeThreadCount > 0 ) {
+ throw new IllegalStateException(this + " cannot be started because it has " + activeThreadCount + " active threads already");
+ }
+
+ final Set<String> ids = new HashSet<>();
+ for ( final ControllerServiceNode node : ignoredReferences ) {
+ ids.add(node.getIdentifier());
+ }
+
+ final Collection<ValidationResult> validationResults = getValidationErrors(ids);
+ for ( final ValidationResult result : validationResults ) {
+ if ( !result.isValid() ) {
+ throw new IllegalStateException(this + " cannot be started because it is not valid: " + result);
+ }
+ }
+ }
+
+
+ @Override
+ public String toString() {
+ return "ReportingTask[id=" + getIdentifier() + ", name=" + getName() + "]";
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
index ed48e20..3d57533 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingContext.java
@@ -124,9 +124,20 @@ public class StandardReportingContext implements ReportingContext, ControllerSer
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
return serviceProvider.isControllerServiceEnabled(serviceIdentifier);
}
+
+ @Override
+ public boolean isControllerServiceEnabling(final String serviceIdentifier) {
+ return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
+ }
@Override
public ControllerServiceLookup getControllerServiceLookup() {
return this;
}
+
+ @Override
+ public String getControllerServiceName(final String serviceIdentifier) {
+ return serviceProvider.getControllerServiceName(serviceIdentifier);
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
index d576f9c..435dbd0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/reporting/StandardReportingInitializationContext.java
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
@@ -33,13 +34,16 @@ public class StandardReportingInitializationContext implements ReportingInitiali
private final String schedulingPeriod;
private final SchedulingStrategy schedulingStrategy;
private final ControllerServiceProvider serviceProvider;
-
- public StandardReportingInitializationContext(final String id, final String name, final SchedulingStrategy schedulingStrategy, final String schedulingPeriod, final ControllerServiceProvider serviceProvider) {
+ private final ComponentLog logger;
+
+ public StandardReportingInitializationContext(final String id, final String name, final SchedulingStrategy schedulingStrategy,
+ final String schedulingPeriod, final ComponentLog logger, final ControllerServiceProvider serviceProvider) {
this.id = id;
this.name = name;
this.schedulingPeriod = schedulingPeriod;
this.serviceProvider = serviceProvider;
this.schedulingStrategy = schedulingStrategy;
+ this.logger = logger;
}
@Override
@@ -90,7 +94,22 @@ public class StandardReportingInitializationContext implements ReportingInitiali
}
@Override
+ public boolean isControllerServiceEnabling(final String serviceIdentifier) {
+ return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
+ }
+
+ @Override
public ControllerServiceLookup getControllerServiceLookup() {
return this;
}
+
+ @Override
+ public String getControllerServiceName(final String serviceIdentifier) {
+ return serviceProvider.getControllerServiceName(serviceIdentifier);
+ }
+
+ @Override
+ public ComponentLog getLogger() {
+ return logger;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 4407451..89850cc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -19,6 +19,8 @@ package org.apache.nifi.controller.scheduling;
import static java.util.Objects.requireNonNull;
import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -32,20 +34,26 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.SchedulingContext;
@@ -144,6 +152,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
componentLifeCycleThreadPool.shutdown();
}
+
+ @Override
public void schedule(final ReportingTaskNode taskNode) {
final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
if (scheduleState.isScheduled()) {
@@ -176,16 +186,11 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
break;
- } catch (final InvocationTargetException ite) {
- LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
- new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
- LOG.error("", ite.getTargetException());
-
- try {
- Thread.sleep(administrativeYieldMillis);
- } catch (final InterruptedException ie) {
- }
} catch (final Exception e) {
+ final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
+ final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
+ componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
+
LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
try {
@@ -200,18 +205,23 @@ public final class StandardProcessScheduler implements ProcessScheduler {
};
componentLifeCycleThreadPool.execute(startReportingTaskRunnable);
+ taskNode.setScheduledState(ScheduledState.RUNNING);
}
+
+ @Override
public void unschedule(final ReportingTaskNode taskNode) {
final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
if (!scheduleState.isScheduled()) {
return;
}
-
+
+ taskNode.verifyCanStop();
final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
final ReportingTask reportingTask = taskNode.getReportingTask();
scheduleState.setScheduled(false);
-
+ taskNode.setScheduledState(ScheduledState.STOPPED);
+
final Runnable unscheduleReportingTaskRunnable = new Runnable() {
@SuppressWarnings("deprecation")
@Override
@@ -222,18 +232,15 @@ public final class StandardProcessScheduler implements ProcessScheduler {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
}
- } catch (final InvocationTargetException ite) {
- LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
- new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
- LOG.error("", ite.getTargetException());
-
- try {
- Thread.sleep(administrativeYieldMillis);
- } catch (final InterruptedException ie) {
- }
} catch (final Exception e) {
- LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
- new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
+ final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
+ final ComponentLog componentLog = new SimpleProcessLogger(reportingTask.getIdentifier(), reportingTask);
+ componentLog.error("Failed to invoke @OnUnscheduled method due to {}", cause);
+
+ LOG.error("Failed to invoke the @OnUnscheduled methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+ reportingTask, cause.toString(), administrativeYieldDuration);
+ LOG.error("", cause);
+
try {
Thread.sleep(administrativeYieldMillis);
} catch (final InterruptedException ie) {
@@ -274,20 +281,38 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
if (!procNode.isValid()) {
- throw new IllegalStateException("Processor " + procNode.getName() + " is not in a valid state");
+ throw new IllegalStateException("Processor " + procNode.getName() + " is not in a valid state due to " + procNode.getValidationErrors());
}
final Runnable startProcRunnable = new Runnable() {
- @SuppressWarnings("deprecation")
@Override
+ @SuppressWarnings("deprecation")
public void run() {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
long lastStopTime = scheduleState.getLastStopTime();
final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor);
- while (true) {
+ final Set<String> serviceIds = new HashSet<>();
+ for ( final PropertyDescriptor descriptor : processContext.getProperties().keySet() ) {
+ final Class<? extends ControllerService> serviceDefinition = descriptor.getControllerServiceDefinition();
+ if ( serviceDefinition != null ) {
+ final String serviceId = processContext.getProperty(descriptor).getValue();
+ serviceIds.add(serviceId);
+ }
+ }
+
+ attemptOnScheduled: while (true) {
try {
synchronized (scheduleState) {
+ for ( final String serviceId : serviceIds ) {
+ final boolean enabled = processContext.isControllerServiceEnabled(serviceId);
+ if ( !enabled ) {
+ LOG.debug("Controller Service with ID {} is not yet enabled, so will not start {} yet", serviceId, procNode);
+ Thread.sleep(administrativeYieldMillis);
+ continue attemptOnScheduled;
+ }
+ }
+
// if no longer scheduled to run, then we're finished. This can happen, for example,
// if the @OnScheduled method throws an Exception and the user stops the processor
// while we're administratively yielded.
@@ -308,11 +333,12 @@ public final class StandardProcessScheduler implements ProcessScheduler {
return;
}
} catch (final Exception e) {
+ final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}",
- new Object[]{procNode.getProcessor(), e.getCause(), administrativeYieldDuration}, e.getCause());
- LOG.error("Failed to invoke @OnScheduled method due to {}", e.getCause().toString(), e.getCause());
+ new Object[]{procNode.getProcessor(), cause.getCause(), administrativeYieldDuration}, cause.getCause());
+ LOG.error("Failed to invoke @OnScheduled method due to {}", cause.getCause().toString(), cause.getCause());
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, procNode.getProcessor(), processContext);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext);
@@ -535,11 +561,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
procNode.setScheduledState(ScheduledState.STOPPED);
-
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, procNode.getProcessor(), processorLog);
- }
}
@Override
@@ -549,11 +570,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
procNode.setScheduledState(ScheduledState.DISABLED);
-
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, procNode.getProcessor(), processorLog);
- }
}
public synchronized void enableReportingTask(final ReportingTaskNode taskNode) {
@@ -562,10 +578,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
taskNode.setScheduledState(ScheduledState.STOPPED);
-
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, taskNode.getReportingTask());
- }
}
public synchronized void disableReportingTask(final ReportingTaskNode taskNode) {
@@ -574,10 +586,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
taskNode.setScheduledState(ScheduledState.DISABLED);
-
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, taskNode.getReportingTask());
- }
}
@Override
@@ -605,4 +613,114 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
return scheduleState;
}
+
+ @Override
+ public void enableControllerService(final ControllerServiceNode service) {
+ service.setState(ControllerServiceState.ENABLING);
+ final ScheduleState scheduleState = getScheduleState(service);
+
+ final Runnable enableRunnable = new Runnable() {
+ @Override
+ public void run() {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ long lastStopTime = scheduleState.getLastStopTime();
+ final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider);
+
+ while (true) {
+ try {
+ synchronized (scheduleState) {
+ // if no longer enabled, then we're finished. This can happen, for example,
+ // if the @OnEnabled method throws an Exception and the user disables the service
+ // while we're administratively yielded.
+ //
+ // we also check if the schedule state's last stop time is equal to what it was before.
+ // if not, then means that the service has been disabled and enabled again, so we should just
+ // bail; another thread will be responsible for invoking the @OnEnabled methods.
+ if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
+ return;
+ }
+
+ ReflectionUtils.invokeMethodsWithAnnotation(OnEnabled.class, service.getControllerServiceImplementation(), configContext);
+ heartbeater.heartbeat();
+ service.setState(ControllerServiceState.ENABLED);
+ return;
+ }
+ } catch (final Exception e) {
+ final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
+
+ final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
+ componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
+ LOG.error("Failed to invoke @OnEnabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
+ if ( LOG.isDebugEnabled() ) {
+ LOG.error("", cause);
+ }
+
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
+ Thread.sleep(administrativeYieldMillis);
+ continue;
+ }
+ }
+ } catch (final Throwable t) {
+ final Throwable cause = (t instanceof InvocationTargetException) ? t.getCause() : t;
+ final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
+ componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
+
+ LOG.error("Failed to invoke @OnEnabled method on {} due to {}", service.getControllerServiceImplementation(), cause.toString());
+ if ( LOG.isDebugEnabled() ) {
+ LOG.error("", cause);
+ }
+ }
+ }
+ };
+
+ scheduleState.setScheduled(true);
+ componentLifeCycleThreadPool.execute(enableRunnable);
+ }
+
+ @Override
+ public void disableControllerService(final ControllerServiceNode service) {
+ service.verifyCanDisable();
+
+ final ScheduleState state = getScheduleState(requireNonNull(service));
+ final Runnable disableRunnable = new Runnable() {
+ @Override
+ public void run() {
+ synchronized (state) {
+ state.setScheduled(false);
+ }
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ final ConfigurationContext configContext = new StandardConfigurationContext(service, controllerServiceProvider);
+
+ while(true) {
+ try {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
+ heartbeater.heartbeat();
+ service.setState(ControllerServiceState.DISABLED);
+ return;
+ } catch (final Exception e) {
+ final Throwable cause = (e instanceof InvocationTargetException) ? e.getCause() : e;
+ final ComponentLog componentLog = new SimpleProcessLogger(service.getIdentifier(), service);
+ componentLog.error("Failed to invoke @OnDisabled method due to {}", cause);
+
+ LOG.error("Failed to invoke @OnDisabled method of {} due to {}", service.getControllerServiceImplementation(), cause.toString());
+ if ( LOG.isDebugEnabled() ) {
+ LOG.error("", cause);
+ }
+
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, service.getControllerServiceImplementation(), configContext);
+ try {
+ Thread.sleep(administrativeYieldMillis);
+ } catch (final InterruptedException ie) {}
+
+ continue;
+ }
+ }
+ }
+ }
+ };
+
+ service.setState(ControllerServiceState.DISABLING);
+ componentLifeCycleThreadPool.execute(disableRunnable);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index db44b5f..1fde670 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -17,30 +17,29 @@
package org.apache.nifi.controller.service;
import java.io.BufferedInputStream;
-import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
-import javax.xml.XMLConstants;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
-import javax.xml.validation.Schema;
-import javax.xml.validation.SchemaFactory;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.nifi.controller.FlowFromDOMFactory;
+import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.util.DomUtils;
-import org.apache.nifi.util.file.FileUtils;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;
import org.xml.sax.SAXParseException;
@@ -49,35 +48,14 @@ import org.xml.sax.SAXParseException;
*/
public class ControllerServiceLoader {
- private static final Log logger = LogFactory.getLog(ControllerServiceLoader.class);
+ private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class);
- private final Path serviceConfigXmlPath;
- public ControllerServiceLoader(final Path serviceConfigXmlPath) throws IOException {
- final File serviceConfigXmlFile = serviceConfigXmlPath.toFile();
- if (!serviceConfigXmlFile.exists() || !serviceConfigXmlFile.canRead()) {
- throw new IOException(serviceConfigXmlPath + " does not appear to exist or cannot be read. Cannot load configuration.");
- }
-
- this.serviceConfigXmlPath = serviceConfigXmlPath;
- }
-
- public List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider) throws IOException {
- final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ public static List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider, final InputStream serializedStream, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) throws IOException {
final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
- InputStream fis = null;
- BufferedInputStream bis = null;
documentBuilderFactory.setNamespaceAware(true);
- final List<ControllerServiceNode> services = new ArrayList<>();
-
- try {
- final URL configurationResource = this.getClass().getResource("/ControllerServiceConfiguration.xsd");
- if (configurationResource == null) {
- throw new NullPointerException("Unable to load XML Schema for ControllerServiceConfiguration");
- }
- final Schema schema = schemaFactory.newSchema(configurationResource);
- documentBuilderFactory.setSchema(schema);
+ try (final InputStream in = new BufferedInputStream(serializedStream)) {
final DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder();
builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
@@ -109,43 +87,72 @@ public class ControllerServiceLoader {
throw err;
}
});
-
- //if controllerService.xml does not exist, create an empty file...
- fis = Files.newInputStream(this.serviceConfigXmlPath, StandardOpenOption.READ);
- bis = new BufferedInputStream(fis);
- if (Files.size(this.serviceConfigXmlPath) > 0) {
- final Document document = builder.parse(bis);
- final NodeList servicesNodes = document.getElementsByTagName("services");
- final Element servicesElement = (Element) servicesNodes.item(0);
-
- final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(servicesElement, "service");
- for (final Element serviceElement : serviceNodes) {
- //get properties for the specific controller task - id, name, class,
- //and schedulingPeriod must be set
- final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim();
- final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
-
- //set the class to be used for the configured controller task
- final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, false);
-
- //optional task-specific properties
- for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
- final String name = optionalProperty.getAttribute("name").trim();
- final String value = optionalProperty.getTextContent().trim();
- serviceNode.setProperty(name, value);
- }
-
- services.add(serviceNode);
- provider.enableControllerService(serviceNode);
- }
- }
+
+ final Document document = builder.parse(in);
+ final Element controllerServices = document.getDocumentElement();
+ final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServices, "controllerService");
+ return new ArrayList<ControllerServiceNode>(loadControllerServices(serviceElements, provider, encryptor, bulletinRepo, autoResumeState));
} catch (SAXException | ParserConfigurationException sxe) {
throw new IOException(sxe);
- } finally {
- FileUtils.closeQuietly(fis);
- FileUtils.closeQuietly(bis);
}
-
- return services;
+ }
+
+ public static Collection<ControllerServiceNode> loadControllerServices(final List<Element> serviceElements, final ControllerServiceProvider provider, final StringEncryptor encryptor, final BulletinRepository bulletinRepo, final boolean autoResumeState) {
+ final Map<ControllerServiceNode, Element> nodeMap = new HashMap<>();
+ for ( final Element serviceElement : serviceElements ) {
+ final ControllerServiceNode serviceNode = createControllerService(provider, serviceElement, encryptor);
+ // We need to clone the node because it will be used in a separate thread below, and
+ // Element is not thread-safe.
+ nodeMap.put(serviceNode, (Element) serviceElement.cloneNode(true));
+ }
+ for ( final Map.Entry<ControllerServiceNode, Element> entry : nodeMap.entrySet() ) {
+ configureControllerService(entry.getKey(), entry.getValue(), encryptor);
+ }
+
+ // Start services
+ if ( autoResumeState ) {
+ final Set<ControllerServiceNode> nodesToEnable = new HashSet<>();
+
+ for ( final ControllerServiceNode node : nodeMap.keySet() ) {
+ final Element controllerServiceElement = nodeMap.get(node);
+
+ final ControllerServiceDTO dto;
+ synchronized (controllerServiceElement.getOwnerDocument()) {
+ dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+ }
+
+ final ControllerServiceState state = ControllerServiceState.valueOf(dto.getState());
+ if (state == ControllerServiceState.ENABLED) {
+ nodesToEnable.add(node);
+ }
+ }
+
+ provider.enableControllerServices(nodesToEnable);
+ }
+
+ return nodeMap.keySet();
+ }
+
+
+ private static ControllerServiceNode createControllerService(final ControllerServiceProvider provider, final Element controllerServiceElement, final StringEncryptor encryptor) {
+ final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+
+ final ControllerServiceNode node = provider.createControllerService(dto.getType(), dto.getId(), false);
+ node.setName(dto.getName());
+ node.setComments(dto.getComments());
+ return node;
+ }
+
+ private static void configureControllerService(final ControllerServiceNode node, final Element controllerServiceElement, final StringEncryptor encryptor) {
+ final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+ node.setAnnotationData(dto.getAnnotationData());
+
+ for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
+ if (entry.getValue() == null) {
+ node.removeProperty(entry.getKey());
+ } else {
+ node.setProperty(entry.getKey(), entry.getValue());
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
index 8b5f27f..8d46b05 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceInitializationContext.java
@@ -21,14 +21,17 @@ import java.util.Set;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
+import org.apache.nifi.logging.ComponentLog;
public class StandardControllerServiceInitializationContext implements ControllerServiceInitializationContext, ControllerServiceLookup {
private final String id;
private final ControllerServiceProvider serviceProvider;
+ private final ComponentLog logger;
- public StandardControllerServiceInitializationContext(final String identifier, final ControllerServiceProvider serviceProvider) {
+ public StandardControllerServiceInitializationContext(final String identifier, final ComponentLog logger, final ControllerServiceProvider serviceProvider) {
this.id = identifier;
+ this.logger = logger;
this.serviceProvider = serviceProvider;
}
@@ -61,4 +64,19 @@ public class StandardControllerServiceInitializationContext implements Controlle
public boolean isControllerServiceEnabled(final ControllerService service) {
return serviceProvider.isControllerServiceEnabled(service);
}
+
+ @Override
+ public boolean isControllerServiceEnabling(String serviceIdentifier) {
+ return serviceProvider.isControllerServiceEnabling(serviceIdentifier);
+ }
+
+ @Override
+ public String getControllerServiceName(final String serviceIdentifier) {
+ return serviceProvider.getControllerServiceName(serviceIdentifier);
+ }
+
+ @Override
+ public ComponentLog getLogger() {
+ return logger;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 741caec..c8c7ec9 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -16,16 +16,17 @@
*/
package org.apache.nifi.controller.service;
+import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractConfiguredComponent;
-import org.apache.nifi.controller.Availability;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
@@ -41,14 +42,14 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
private final ControllerService implementation;
private final ControllerServiceProvider serviceProvider;
- private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
- private final AtomicBoolean disabled = new AtomicBoolean(true);
+ private final AtomicReference<ControllerServiceState> stateRef = new AtomicReference<>(ControllerServiceState.DISABLED);
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
+ private String comment;
public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
@@ -58,38 +59,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
this.serviceProvider = serviceProvider;
}
- @Override
- public boolean isDisabled() {
- return disabled.get();
- }
-
- @Override
- public void setDisabled(final boolean disabled) {
- if (!disabled && !isValid()) {
- throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid");
- }
-
- if (disabled) {
- // do not allow a Controller Service to be disabled if it's currently being used.
- final Set<ConfiguredComponent> runningRefs = getReferences().getRunningReferences();
- if (!runningRefs.isEmpty()) {
- throw new IllegalStateException("Cannot disable Controller Service because it is referenced (either directly or indirectly) by " + runningRefs.size() + " different components that are currently running");
- }
- }
-
- this.disabled.set(disabled);
- }
-
- @Override
- public Availability getAvailability() {
- return availability.get();
- }
-
- @Override
- public void setAvailability(final Availability availability) {
- this.availability.set(availability);
- }
-
+
@Override
public ControllerService getProxiedControllerService() {
return proxedControllerService;
@@ -132,7 +102,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void verifyModifiable() throws IllegalStateException {
- if (!isDisabled()) {
+ if (getState() != ControllerServiceState.DISABLED) {
throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
}
}
@@ -140,7 +110,6 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void setProperty(final String name, final String value) {
super.setProperty(name, value);
-
onConfigured();
}
@@ -166,31 +135,96 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void verifyCanDelete() {
- if ( !isDisabled() ) {
+ if ( getState() != ControllerServiceState.DISABLED ) {
throw new IllegalStateException(implementation + " cannot be deleted because it is not disabled");
}
}
@Override
public void verifyCanDisable() {
+ verifyCanDisable(Collections.<ControllerServiceNode>emptySet());
+ }
+
+ @Override
+ public void verifyCanDisable(final Set<ControllerServiceNode> ignoreReferences) {
+ final ControllerServiceState state = getState();
+ if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+ throw new IllegalStateException("Cannot disable " + getControllerServiceImplementation() + " because it is not enabled");
+ }
+
final ControllerServiceReference references = getReferences();
- final int numRunning = references.getRunningReferences().size();
- if ( numRunning > 0 ) {
- throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by " + numRunning + " components that are currently running");
+
+ for ( final ConfiguredComponent activeReference : references.getActiveReferences() ) {
+ if ( !ignoreReferences.contains(activeReference) ) {
+ throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by at least one component that is currently running");
+ }
}
}
@Override
public void verifyCanEnable() {
- if ( !isDisabled() ) {
+ if ( getState() != ControllerServiceState.DISABLED ) {
throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
}
+
+ if ( !isValid() ) {
+ throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + getValidationErrors());
+ }
+ }
+
+ @Override
+ public void verifyCanEnable(final Set<ControllerServiceNode> ignoredReferences) {
+ if (getState() != ControllerServiceState.DISABLED) {
+ throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
+ }
+
+ final Set<String> ids = new HashSet<>();
+ for ( final ControllerServiceNode node : ignoredReferences ) {
+ ids.add(node.getIdentifier());
+ }
+
+ final Collection<ValidationResult> validationResults = getValidationErrors(ids);
+ for ( final ValidationResult result : validationResults ) {
+ if ( !result.isValid() ) {
+ throw new IllegalStateException(implementation + " cannot be enabled because it is not valid: " + result);
+ }
+ }
}
@Override
public void verifyCanUpdate() {
- if ( !isDisabled() ) {
+ if ( getState() != ControllerServiceState.DISABLED ) {
throw new IllegalStateException(implementation + " cannot be updated because it is not disabled");
}
}
+
+ @Override
+ public String getComments() {
+ readLock.lock();
+ try {
+ return comment;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void setComments(final String comment) {
+ writeLock.lock();
+ try {
+ this.comment = comment;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public ControllerServiceState getState() {
+ return stateRef.get();
+ }
+
+ @Override
+ public void setState(final ControllerServiceState state) {
+ this.stateRef.set(state);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 7a8e22f..dfbfca5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -23,26 +23,39 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import org.apache.nifi.annotation.lifecycle.OnAdded;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ProcessScheduler;
+import org.apache.nifi.controller.ProcessorNode;
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.controller.exception.ControllerServiceAlreadyExistsException;
-import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
+import org.apache.nifi.controller.exception.ControllerServiceInstantiationException;
import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.processor.StandardValidationContextFactory;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
@@ -55,8 +68,10 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
- private final Map<String, ControllerServiceNode> controllerServices;
+ private final ProcessScheduler processScheduler;
+ private final ConcurrentMap<String, ControllerServiceNode> controllerServices;
private static final Set<Method> validDisabledMethods;
+ private final BulletinRepository bulletinRepo;
static {
// methods that are okay to be called when the service is disabled.
@@ -70,10 +85,12 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
validDisabledMethods = Collections.unmodifiableSet(validMethods);
}
- public StandardControllerServiceProvider() {
+ public StandardControllerServiceProvider(final ProcessScheduler scheduler, final BulletinRepository bulletinRepo) {
// the following 2 maps must be updated atomically, but we do not lock around them because they are modified
// only in the createControllerService method, and both are modified before the method returns
this.controllerServices = new ConcurrentHashMap<>();
+ this.processScheduler = scheduler;
+ this.bulletinRepo = bulletinRepo;
}
private Class<?>[] getInterfaces(final Class<?> cls) {
@@ -95,21 +112,24 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
populateInterfaces(superClass, interfacesDefinedThusFar);
}
}
-
+
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
if (type == null || id == null) {
throw new NullPointerException();
}
- if (controllerServices.containsKey(id)) {
- throw new ControllerServiceAlreadyExistsException(id);
- }
-
+
final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
final ClassLoader cl = ExtensionManager.getClassLoader(type);
- Thread.currentThread().setContextClassLoader(cl);
- final Class<?> rawClass = Class.forName(type, false, cl);
+ final Class<?> rawClass;
+ if ( cl == null ) {
+ rawClass = Class.forName(type);
+ } else {
+ Thread.currentThread().setContextClassLoader(cl);
+ rawClass = Class.forName(type, false, cl);
+ }
+
final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
final ControllerService originalService = controllerServiceClass.newInstance();
@@ -124,7 +144,9 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
final ControllerServiceNode node = serviceNodeHolder.get();
- if (node.isDisabled() && !validDisabledMethods.contains(method)) {
+ final ControllerServiceState state = node.getState();
+ final boolean disabled = (state != ControllerServiceState.ENABLED); // only allow method call if service state is ENABLED.
+ if (disabled && !validDisabledMethods.contains(method)) {
// Use nar class loader here because we are implicitly calling toString() on the original implementation.
try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService + " because the Controller Service is disabled");
@@ -143,17 +165,22 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
};
- final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
- logger.info("Loaded service {} as configured.", type);
+ final ControllerService proxiedService;
+ if ( cl == null ) {
+ proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), getInterfaces(controllerServiceClass), invocationHandler);
+ } else {
+ proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
+ }
+ logger.info("Created Controller Service of type {} with identifier {}", type, id);
- originalService.initialize(new StandardControllerServiceInitializationContext(id, this));
+ final ComponentLog serviceLogger = new SimpleProcessLogger(id, originalService);
+ originalService.initialize(new StandardControllerServiceInitializationContext(id, serviceLogger, this));
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this);
serviceNodeHolder.set(serviceNode);
- serviceNode.setAnnotationData(null);
- serviceNode.setName(id);
+ serviceNode.setName(rawClass.getSimpleName());
if ( firstTimeAdded ) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
@@ -166,7 +193,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
this.controllerServices.put(id, serviceNode);
return serviceNode;
} catch (final Throwable t) {
- throw new ControllerServiceNotFoundException(t);
+ throw new ControllerServiceInstantiationException(t);
} finally {
if (currentContextClassLoader != null) {
Thread.currentThread().setContextClassLoader(currentContextClassLoader);
@@ -174,29 +201,242 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
+
+
+ @Override
+ public void disableReferencingServices(final ControllerServiceNode serviceNode) {
+ // Get a list of all Controller Services that need to be disabled, in the order that they need to be
+ // disabled.
+ final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
+
+ for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+ final ControllerServiceState state = nodeToDisable.getState();
+
+ if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+ nodeToDisable.verifyCanDisable(serviceSet);
+ }
+ }
+
+ Collections.reverse(toDisable);
+ for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+ final ControllerServiceState state = nodeToDisable.getState();
+
+ if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+ disableControllerService(nodeToDisable);
+ }
+ }
+ }
+
+
+ @Override
+ public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
+ // or a service that references this controller service, etc.
+ final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
+ final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
+
+ // verify that we can start all components (that are not disabled) before doing anything
+ for ( final ProcessorNode node : processors ) {
+ if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+ node.verifyCanStart();
+ }
+ }
+ for ( final ReportingTaskNode node : reportingTasks ) {
+ if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+ node.verifyCanStart();
+ }
+ }
+
+ // start all of the components that are not disabled
+ for ( final ProcessorNode node : processors ) {
+ if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+ node.getProcessGroup().startProcessor(node);
+ }
+ }
+ for ( final ReportingTaskNode node : reportingTasks ) {
+ if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+ processScheduler.schedule(node);
+ }
+ }
+ }
+
+ @Override
+ public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
+ // or a service that references this controller service, etc.
+ final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
+ final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
+
+ // verify that we can stop all components (that are running) before doing anything
+ for ( final ProcessorNode node : processors ) {
+ if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+ node.verifyCanStop();
+ }
+ }
+ for ( final ReportingTaskNode node : reportingTasks ) {
+ if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+ node.verifyCanStop();
+ }
+ }
+
+ // stop all of the components that are running
+ for ( final ProcessorNode node : processors ) {
+ if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+ node.getProcessGroup().stopProcessor(node);
+ }
+ }
+ for ( final ReportingTaskNode node : reportingTasks ) {
+ if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+ processScheduler.unschedule(node);
+ }
+ }
+ }
+
@Override
public void enableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanEnable();
+ processScheduler.enableControllerService(serviceNode);
+ }
+
+ @Override
+ public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
+ final Set<ControllerServiceNode> servicesToEnable = new HashSet<>();
+ // Ensure that all nodes are already disabled
+ for ( final ControllerServiceNode serviceNode : serviceNodes ) {
+ final ControllerServiceState curState = serviceNode.getState();
+ if ( ControllerServiceState.DISABLED.equals(curState) ) {
+ servicesToEnable.add(serviceNode);
+ } else {
+ logger.warn("Cannot enable {} because it is not disabled; current state is {}", serviceNode, curState);
+ }
+ }
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, this);
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation(), configContext);
+ // determine the order to load the services. We have to ensure that if service A references service B, then B
+ // is enabled first, and so on.
+ final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>();
+ for ( final ControllerServiceNode node : servicesToEnable ) {
+ idToNodeMap.put(node.getIdentifier(), node);
+ }
+
+ // We can have many Controller Services dependent on one another. We can have many of these
+ // disparate lists of Controller Services that are dependent on one another. We refer to each
+ // of these as a branch.
+ final List<List<ControllerServiceNode>> branches = determineEnablingOrder(idToNodeMap);
+
+ if ( branches.isEmpty() ) {
+ logger.info("No Controller Services to enable");
+ return;
+ } else {
+ logger.info("Will enable {} Controller Services", servicesToEnable.size());
+ }
+
+ // Mark all services that are configured to be enabled as 'ENABLING'. This allows Processors, reporting tasks
+ // to be valid so that they can be scheduled.
+ for ( final List<ControllerServiceNode> branch : branches ) {
+ for ( final ControllerServiceNode nodeToEnable : branch ) {
+ nodeToEnable.setState(ControllerServiceState.ENABLING);
+ }
+ }
+
+ final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
+ final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()));
+ for ( final List<ControllerServiceNode> branch : branches ) {
+ final Runnable enableBranchRunnable = new Runnable() {
+ @Override
+ public void run() {
+ logger.debug("Enabling Controller Service Branch {}", branch);
+
+ for ( final ControllerServiceNode serviceNode : branch ) {
+ try {
+ if ( !enabledNodes.contains(serviceNode) ) {
+ enabledNodes.add(serviceNode);
+
+ logger.info("Enabling {}", serviceNode);
+ try {
+ processScheduler.enableControllerService(serviceNode);
+ } catch (final Exception e) {
+ logger.error("Failed to enable " + serviceNode + " due to " + e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+
+ if ( bulletinRepo != null ) {
+ bulletinRepo.addBulletin(BulletinFactory.createBulletin(
+ "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
+ }
+ }
+ }
+
+ // wait for service to finish enabling.
+ while ( ControllerServiceState.ENABLING.equals(serviceNode.getState()) ) {
+ try {
+ Thread.sleep(100L);
+ } catch (final InterruptedException ie) {}
+ }
+
+ logger.info("State for {} is now {}", serviceNode, serviceNode.getState());
+ } catch (final Exception e) {
+ logger.error("Failed to enable {} due to {}", serviceNode, e.toString());
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+ }
+ }
+ }
+ };
+
+ executor.submit(enableBranchRunnable);
+ }
+
+ executor.shutdown();
+ }
+
+ static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
+ final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>();
+
+ for ( final ControllerServiceNode node : serviceNodeMap.values() ) {
+ if ( orderedNodeLists.contains(node) ) {
+ continue; // this node is already in the list.
+ }
+
+ final List<ControllerServiceNode> branch = new ArrayList<>();
+ determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>());
+ orderedNodeLists.add(branch);
+ }
+
+ return orderedNodeLists;
+ }
+
+
+ private static void determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode, final List<ControllerServiceNode> orderedNodes, final Set<ControllerServiceNode> visited) {
+ if ( visited.contains(contextNode) ) {
+ return;
}
- serviceNode.setDisabled(false);
+ for ( final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet() ) {
+ if ( entry.getKey().getControllerServiceDefinition() != null ) {
+ final String referencedServiceId = entry.getValue();
+ if ( referencedServiceId != null ) {
+ final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId);
+ if ( !orderedNodes.contains(referencedNode) ) {
+ visited.add(contextNode);
+ determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited);
+ }
+ }
+ }
+ }
+
+ if ( !orderedNodes.contains(contextNode) ) {
+ orderedNodes.add(contextNode);
+ }
}
+
@Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanDisable();
-
- // We must set the service to disabled before we invoke the OnDisabled methods because the service node
- // can throw Exceptions if we attempt to disable the service while it's known to be in use.
- serviceNode.setDisabled(true);
-
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation());
- }
+ processScheduler.disableControllerService(serviceNode);
}
@Override
@@ -213,10 +453,16 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public boolean isControllerServiceEnabled(final String serviceIdentifier) {
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
- return (node == null) ? false : !node.isDisabled();
+ return (node == null) ? false : (ControllerServiceState.ENABLED == node.getState());
}
@Override
+ public boolean isControllerServiceEnabling(final String serviceIdentifier) {
+ final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
+ return (node == null) ? false : (ControllerServiceState.ENABLING == node.getState());
+ }
+
+ @Override
public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
return controllerServices.get(serviceIdentifier);
}
@@ -234,6 +480,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
@Override
+ public String getControllerServiceName(final String serviceIdentifier) {
+ final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
+ return node == null ? null : node.getName();
+ }
+
public void removeControllerService(final ControllerServiceNode serviceNode) {
final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier());
if ( existing == null || existing != serviceNode ) {
@@ -247,6 +498,139 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext);
}
+ for ( final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet() ) {
+ final PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.getControllerServiceDefinition() != null ) {
+ final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
+ if ( value != null ) {
+ final ControllerServiceNode referencedNode = getControllerServiceNode(value);
+ if ( referencedNode != null ) {
+ referencedNode.removeReference(serviceNode);
+ }
+ }
+ }
+ }
+
controllerServices.remove(serviceNode.getIdentifier());
}
+
+ @Override
+ public Set<ControllerServiceNode> getAllControllerServices() {
+ return new HashSet<>(controllerServices.values());
+ }
+
+
+ /**
+ * Returns a List of all components that reference the given referencedNode (either directly or indirectly through
+ * another service) that are also of the given componentType. The list that is returned is in the order in which they will
+ * need to be 'activated' (enabled/started).
+ * @param referencedNode
+ * @param componentType
+ * @return
+ */
+ private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) {
+ final List<T> references = new ArrayList<>();
+
+ for ( final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents() ) {
+ if ( componentType.isAssignableFrom(referencingComponent.getClass()) ) {
+ references.add(componentType.cast(referencingComponent));
+ }
+
+ if ( referencingComponent instanceof ControllerServiceNode ) {
+ final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent;
+
+ // find components recursively that depend on referencingNode.
+ final List<T> recursive = findRecursiveReferences(referencingNode, componentType);
+
+ // For anything that depends on referencing node, we want to add it to the list, but we know
+ // that it must come after the referencing node, so we first remove any existing occurrence.
+ references.removeAll(recursive);
+ references.addAll(recursive);
+ }
+ }
+
+ return references;
+ }
+
+
+ @Override
+ public void enableReferencingServices(final ControllerServiceNode serviceNode) {
+ final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ enableReferencingServices(serviceNode, recursiveReferences);
+ }
+
+ private void enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) {
+ if ( serviceNode.getState() != ControllerServiceState.ENABLED && serviceNode.getState() != ControllerServiceState.ENABLING ) {
+ serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences));
+ }
+
+ final Set<ControllerServiceNode> ifEnabled = new HashSet<>();
+ final List<ControllerServiceNode> toEnable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ for ( final ControllerServiceNode nodeToEnable : toEnable ) {
+ final ControllerServiceState state = nodeToEnable.getState();
+ if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+ nodeToEnable.verifyCanEnable(ifEnabled);
+ ifEnabled.add(nodeToEnable);
+ }
+ }
+
+ for ( final ControllerServiceNode nodeToEnable : toEnable ) {
+ final ControllerServiceState state = nodeToEnable.getState();
+ if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+ enableControllerService(nodeToEnable);
+ }
+ }
+ }
+
+ @Override
+ public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
+ final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
+
+ for ( final ControllerServiceNode referencingService : referencingServices ) {
+ referencingService.verifyCanEnable(referencingServiceSet);
+ }
+ }
+
+ @Override
+ public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ final List<ReportingTaskNode> referencingReportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
+ final List<ProcessorNode> referencingProcessors = findRecursiveReferences(serviceNode, ProcessorNode.class);
+
+ final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
+
+ for ( final ReportingTaskNode taskNode : referencingReportingTasks ) {
+ if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
+ taskNode.verifyCanStart(referencingServiceSet);
+ }
+ }
+
+ for ( final ProcessorNode procNode : referencingProcessors ) {
+ if ( procNode.getScheduledState() != ScheduledState.DISABLED ) {
+ procNode.verifyCanStart(referencingServiceSet);
+ }
+ }
+ }
+
+ @Override
+ public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
+ // Get a list of all Controller Services that need to be disabled, in the order that they need to be
+ // disabled.
+ final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
+ final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
+
+ for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+ final ControllerServiceState state = nodeToDisable.getState();
+
+ if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+ nodeToDisable.verifyCanDisable(serviceSet);
+ }
+ }
+ }
+
+ @Override
+ public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
+ // we can always stop referencing components
+ }
}