You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/26 15:16:52 UTC
[36/47] incubator-nifi git commit: NIFI-6: Rebase from develop to
include renaming of directory structure
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 0000000,7fc65f9..0653b03
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@@ -1,0 -1,569 +1,640 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.controller.scheduling;
+
+ import static java.util.Objects.requireNonNull;
+
+ import java.lang.reflect.InvocationTargetException;
+ import java.util.concurrent.ConcurrentHashMap;
+ import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
+
++import org.apache.nifi.annotation.lifecycle.OnDisabled;
++import org.apache.nifi.annotation.lifecycle.OnEnabled;
++import org.apache.nifi.annotation.lifecycle.OnScheduled;
++import org.apache.nifi.annotation.lifecycle.OnStopped;
++import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+ import org.apache.nifi.connectable.Connectable;
+ import org.apache.nifi.connectable.Funnel;
+ import org.apache.nifi.connectable.Port;
+ import org.apache.nifi.controller.AbstractPort;
+ import org.apache.nifi.controller.ConfigurationContext;
+ import org.apache.nifi.controller.Heartbeater;
+ import org.apache.nifi.controller.ProcessScheduler;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.ReportingTaskNode;
+ import org.apache.nifi.controller.ScheduledState;
+ import org.apache.nifi.controller.annotation.OnConfigured;
++import org.apache.nifi.controller.service.ControllerServiceNode;
+ import org.apache.nifi.controller.service.ControllerServiceProvider;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.apache.nifi.engine.FlowEngine;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.processor.SchedulingContext;
+ import org.apache.nifi.processor.SimpleProcessLogger;
+ import org.apache.nifi.processor.StandardProcessContext;
+ import org.apache.nifi.processor.StandardSchedulingContext;
-import org.apache.nifi.processor.annotation.OnScheduled;
-import org.apache.nifi.processor.annotation.OnStopped;
-import org.apache.nifi.processor.annotation.OnUnscheduled;
+ import org.apache.nifi.reporting.ReportingTask;
+ import org.apache.nifi.scheduling.SchedulingStrategy;
+ import org.apache.nifi.util.FormatUtils;
+ import org.apache.nifi.util.NiFiProperties;
+ import org.apache.nifi.util.ReflectionUtils;
-
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ /**
+ * Responsible for scheduling Processors, Ports, and Funnels to run at regular
+ * intervals
+ */
+ public final class StandardProcessScheduler implements ProcessScheduler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(StandardProcessScheduler.class);
+
+ private final ControllerServiceProvider controllerServiceProvider;
+ private final Heartbeater heartbeater;
+ private final long administrativeYieldMillis;
+ private final String administrativeYieldDuration;
+
+ private final ConcurrentMap<Object, ScheduleState> scheduleStates = new ConcurrentHashMap<>();
+ private final ScheduledExecutorService frameworkTaskExecutor;
+ private final ConcurrentMap<SchedulingStrategy, SchedulingAgent> strategyAgentMap = new ConcurrentHashMap<>();
+ // thread pool for starting/stopping components
+ private final ExecutorService componentLifeCycleThreadPool = new ThreadPoolExecutor(25, 50, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(5000));
+ private final StringEncryptor encryptor;
+
+ public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor) {
+ this.heartbeater = heartbeater;
+ this.controllerServiceProvider = controllerServiceProvider;
+ this.encryptor = encryptor;
+
+ administrativeYieldDuration = NiFiProperties.getInstance().getAdministrativeYieldDuration();
+ administrativeYieldMillis = FormatUtils.getTimeDuration(administrativeYieldDuration, TimeUnit.MILLISECONDS);
+
+ frameworkTaskExecutor = new FlowEngine(4, "Framework Task Thread");
+ }
+
+ public void scheduleFrameworkTask(final Runnable command, final String taskName, final long initialDelay, final long delay, final TimeUnit timeUnit) {
+ frameworkTaskExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ command.run();
+ } catch (final Throwable t) {
+ LOG.error("Failed to run Framework Task {} due to {}", command, t.toString());
+ if (LOG.isDebugEnabled()) {
+ LOG.error("", t);
+ }
+ }
+ }
+ }, initialDelay, delay, timeUnit);
+ }
+
+ @Override
+ public void setMaxThreadCount(final SchedulingStrategy schedulingStrategy, final int maxThreadCount) {
+ final SchedulingAgent agent = getSchedulingAgent(schedulingStrategy);
+ if (agent == null) {
+ return;
+ }
+
+ agent.setMaxThreadCount(maxThreadCount);
+ }
+
+ public void setSchedulingAgent(final SchedulingStrategy strategy, final SchedulingAgent agent) {
+ strategyAgentMap.put(strategy, agent);
+ }
+
+ public SchedulingAgent getSchedulingAgent(final SchedulingStrategy strategy) {
+ return strategyAgentMap.get(strategy);
+ }
+
+ private SchedulingAgent getSchedulingAgent(final Connectable connectable) {
+ return getSchedulingAgent(connectable.getSchedulingStrategy());
+ }
+
+ @Override
+ public void shutdown() {
+ for (final SchedulingAgent schedulingAgent : strategyAgentMap.values()) {
+ try {
+ schedulingAgent.shutdown();
+ } catch (final Throwable t) {
+ LOG.error("Failed to shutdown Scheduling Agent {} due to {}", schedulingAgent, t.toString());
+ LOG.error("", t);
+ }
+ }
+
+ frameworkTaskExecutor.shutdown();
+ componentLifeCycleThreadPool.shutdown();
+ }
+
+ public void schedule(final ReportingTaskNode taskNode) {
+ final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
+ if (scheduleState.isScheduled()) {
+ return;
+ }
+
+ final int activeThreadCount = scheduleState.getActiveThreadCount();
+ if (activeThreadCount > 0) {
+ throw new IllegalStateException("Reporting Task " + taskNode.getName() + " cannot be started because it has " + activeThreadCount + " threads still running");
+ }
+
+ if (!taskNode.isValid()) {
+ throw new IllegalStateException("Reporting Task " + taskNode.getName() + " is not in a valid state for the following reasons: " + taskNode.getValidationErrors());
+ }
+
+ final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
+ scheduleState.setScheduled(true);
+
+ final Runnable startReportingTaskRunnable = new Runnable() {
++ @SuppressWarnings("deprecation")
+ @Override
+ public void run() {
++ // Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time.
+ while (true) {
+ final ReportingTask reportingTask = taskNode.getReportingTask();
+
+ try {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, taskNode.getConfigurationContext());
++ ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
+ }
++
+ break;
+ } catch (final InvocationTargetException ite) {
- LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
++ LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+ new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
+ LOG.error("", ite.getTargetException());
+
+ try {
+ Thread.sleep(administrativeYieldMillis);
+ } catch (final InterruptedException ie) {
+ }
+ } catch (final Exception e) {
- LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
++ LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+ new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
+ try {
+ Thread.sleep(administrativeYieldMillis);
+ } catch (final InterruptedException ie) {
+ }
+ }
+ }
+
+ agent.schedule(taskNode, scheduleState);
+ }
+ };
+
+ componentLifeCycleThreadPool.execute(startReportingTaskRunnable);
+ }
+
+ public void unschedule(final ReportingTaskNode taskNode) {
+ final ScheduleState scheduleState = getScheduleState(requireNonNull(taskNode));
+ if (!scheduleState.isScheduled()) {
+ return;
+ }
+
+ final SchedulingAgent agent = getSchedulingAgent(taskNode.getSchedulingStrategy());
+ final ReportingTask reportingTask = taskNode.getReportingTask();
+ scheduleState.setScheduled(false);
+
+ final Runnable unscheduleReportingTaskRunnable = new Runnable() {
++ @SuppressWarnings("deprecation")
+ @Override
+ public void run() {
+ final ConfigurationContext configurationContext = taskNode.getConfigurationContext();
+
- while (true) {
- try {
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, reportingTask, configurationContext);
- }
- break;
- } catch (final InvocationTargetException ite) {
- LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
- new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
- LOG.error("", ite.getTargetException());
++ try {
++ try (final NarCloseable x = NarCloseable.withNarLoader()) {
++ ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
++ }
++ } catch (final InvocationTargetException ite) {
++ LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
++ new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
++ LOG.error("", ite.getTargetException());
+
- try {
- Thread.sleep(administrativeYieldMillis);
- } catch (final InterruptedException ie) {
- }
- } catch (final Exception e) {
- LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
- new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
- try {
- Thread.sleep(administrativeYieldMillis);
- } catch (final InterruptedException ie) {
- }
++ try {
++ Thread.sleep(administrativeYieldMillis);
++ } catch (final InterruptedException ie) {
++ }
++ } catch (final Exception e) {
++ LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
++ new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
++ try {
++ Thread.sleep(administrativeYieldMillis);
++ } catch (final InterruptedException ie) {
+ }
+ }
+
+ agent.unschedule(taskNode, scheduleState);
+
- if (scheduleState.getActiveThreadCount() == 0) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, reportingTask, configurationContext);
++ if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) {
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext);
+ }
+ }
+ };
+
+ componentLifeCycleThreadPool.execute(unscheduleReportingTaskRunnable);
+ }
+
+ /**
+ * Starts scheduling the given processor to run after invoking all methods
+ * on the underlying {@link nifi.processor.Processor
+ * FlowFileProcessor} that are annotated with the {@link OnScheduled}
+ * annotation.
+ */
+ @Override
+ public synchronized void startProcessor(final ProcessorNode procNode) {
+ if (procNode.getScheduledState() == ScheduledState.DISABLED) {
+ throw new IllegalStateException(procNode + " is disabled, so it cannot be started");
+ }
+ final ScheduleState scheduleState = getScheduleState(requireNonNull(procNode));
+
+ if (scheduleState.isScheduled()) {
+ return;
+ }
+
+ final int activeThreadCount = scheduleState.getActiveThreadCount();
+ if (activeThreadCount > 0) {
+ throw new IllegalStateException("Processor " + procNode.getName() + " cannot be started because it has " + activeThreadCount + " threads still running");
+ }
+
+ if (!procNode.isValid()) {
+ throw new IllegalStateException("Processor " + procNode.getName() + " is not in a valid state");
+ }
+
+ final Runnable startProcRunnable = new Runnable() {
++ @SuppressWarnings("deprecation")
+ @Override
+ public void run() {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ long lastStopTime = scheduleState.getLastStopTime();
+ final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor);
+
+ while (true) {
+ try {
+ synchronized (scheduleState) {
+ // if no longer scheduled to run, then we're finished. This can happen, for example,
+ // if the @OnScheduled method throws an Exception and the user stops the processor
+ // while we're administratively yielded.
+ //
+ // we also check if the schedule state's last start time is equal to what it was before.
+ // if not, then means that the processor has been stopped and started again, so we should just
+ // bail; another thread will be responsible for invoking the @OnScheduled methods.
+ if (!scheduleState.isScheduled() || scheduleState.getLastStopTime() != lastStopTime) {
+ return;
+ }
+
+ final SchedulingContext schedulingContext = new StandardSchedulingContext(processContext, controllerServiceProvider, procNode);
- ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, procNode.getProcessor(), schedulingContext);
++ ReflectionUtils.invokeMethodsWithAnnotation(OnScheduled.class, org.apache.nifi.processor.annotation.OnScheduled.class, procNode.getProcessor(), schedulingContext);
+
+ getSchedulingAgent(procNode).schedule(procNode, scheduleState);
+
+ heartbeater.heartbeat();
+ return;
+ }
+ } catch (final Exception e) {
+ final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+
+ procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run for {}",
+ new Object[]{procNode.getProcessor(), e.getCause(), administrativeYieldDuration}, e.getCause());
+ LOG.error("Failed to invoke @OnScheduled method due to {}", e.getCause().toString(), e.getCause());
+
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, procNode.getProcessor(), processContext);
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext);
+
+ Thread.sleep(administrativeYieldMillis);
+ continue;
+ }
+ }
+ } catch (final Throwable t) {
+ final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+ procLog.error("{} failed to invoke @OnScheduled method due to {}; processor will not be scheduled to run", new Object[]{procNode.getProcessor(), t});
+ LOG.error("Failed to invoke @OnScheduled method due to {}", t.toString(), t);
+ }
+ }
+ };
+
+ scheduleState.setScheduled(true);
+ procNode.setScheduledState(ScheduledState.RUNNING);
+
+ componentLifeCycleThreadPool.execute(startProcRunnable);
+ }
+
+ /**
+ * Used to delay scheduling the given Processor to run until its yield
+ * duration expires.
+ *
+ * @param procNode
+ */
+ @Override
+ public void yield(final ProcessorNode procNode) {
+ // This exists in the ProcessScheduler so that the scheduler can take advantage of the fact that
+ // the Processor was yielded and, as a result, avoid scheduling the Processor to potentially run
+ // (thereby skipping the overhead of the Context Switches) if nothing can be done.
+ //
+ // We used to implement this feature by canceling all futures for the given Processor and
+ // re-submitting them with a delay. However, this became problematic, because we have situations where
+ // a Processor will wait several seconds (often 30 seconds in the case of a network timeout), and then yield
+ // the context. If this Processor has X number of threads, we end up submitting X new tasks while the previous
+ // X-1 tasks are still running. At this point, another thread could finish and do the same thing, resulting in
+ // an additional X-1 extra tasks being submitted.
+ //
+ // As a result, we simply removed this buggy implementation, as it was a very minor performance optimization
+ // that gave very bad results.
+ }
+
+ /**
+ * Stops scheduling the given processor to run and invokes all methods on
+ * the underlying {@link nifi.processor.Processor FlowFileProcessor} that
+ * are annotated with the {@link OnUnscheduled} annotation.
+ */
+ @Override
+ public synchronized void stopProcessor(final ProcessorNode procNode) {
+ final ScheduleState state = getScheduleState(requireNonNull(procNode));
+
+ synchronized (state) {
+ if (!state.isScheduled()) {
+ procNode.setScheduledState(ScheduledState.STOPPED);
+ return;
+ }
+
+ getSchedulingAgent(procNode).unschedule(procNode, state);
+ procNode.setScheduledState(ScheduledState.STOPPED);
+ state.setScheduled(false);
+ }
+
+ final Runnable stopProcRunnable = new Runnable() {
+ @Override
+ public void run() {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ final StandardProcessContext processContext = new StandardProcessContext(procNode, controllerServiceProvider, encryptor);
+
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnUnscheduled.class, procNode.getProcessor(), processContext);
+
+ // If no threads currently running, call the OnStopped methods
+ if (state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext);
+ heartbeater.heartbeat();
+ }
+ }
+ }
+ };
+
+ componentLifeCycleThreadPool.execute(stopProcRunnable);
+ }
+
+ @Override
+ public void registerEvent(final Connectable worker) {
+ getSchedulingAgent(worker).onEvent(worker);
+ }
+
+ /**
+ * Returns the number of threads that are currently active for the given
+ * <code>Connectable</code>.
+ *
+ * @return
+ */
+ @Override
+ public int getActiveThreadCount(final Object scheduled) {
+ return getScheduleState(scheduled).getActiveThreadCount();
+ }
+
+ /**
+ * Begins scheduling the given port to run.
+ *
+ * @throws NullPointerException if the Port is null
+ * @throws IllegalStateException if the Port is already scheduled to run or
+ * has threads running
+ */
+ @Override
+ public void startPort(final Port port) {
+ if (!port.isValid()) {
+ throw new IllegalStateException("Port " + port.getName() + " is not in a valid state");
+ }
+
+ port.onSchedulingStart();
+ startConnectable(port);
+ }
+
+ @Override
+ public void startFunnel(final Funnel funnel) {
+ startConnectable(funnel);
+ funnel.setScheduledState(ScheduledState.RUNNING);
+ }
+
+ @Override
+ public void stopPort(final Port port) {
+ stopConnectable(port);
+ port.shutdown();
+ }
+
+ @Override
+ public void stopFunnel(final Funnel funnel) {
+ stopConnectable(funnel);
+ funnel.setScheduledState(ScheduledState.STOPPED);
+ }
+
+ private synchronized void startConnectable(final Connectable connectable) {
+ if (connectable.getScheduledState() == ScheduledState.DISABLED) {
+ throw new IllegalStateException(connectable + " is disabled, so it cannot be started");
+ }
+
+ final ScheduleState scheduleState = getScheduleState(requireNonNull(connectable));
+ if (scheduleState.isScheduled()) {
+ return;
+ }
+
+ final int activeThreads = scheduleState.getActiveThreadCount();
+ if (activeThreads > 0) {
+ throw new IllegalStateException("Port cannot be scheduled to run until its last " + activeThreads + " threads finish");
+ }
+
+ getSchedulingAgent(connectable).schedule(connectable, scheduleState);
+ scheduleState.setScheduled(true);
+ }
+
+ private synchronized void stopConnectable(final Connectable connectable) {
+ final ScheduleState state = getScheduleState(requireNonNull(connectable));
+ if (!state.isScheduled()) {
+ return;
+ }
+ state.setScheduled(false);
+
+ getSchedulingAgent(connectable).unschedule(connectable, state);
+
+ if (!state.isScheduled() && state.getActiveThreadCount() == 0 && state.mustCallOnStoppedMethods()) {
+ final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor);
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext);
+ heartbeater.heartbeat();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void enableFunnel(final Funnel funnel) {
+ if (funnel.getScheduledState() != ScheduledState.DISABLED) {
+ throw new IllegalStateException("Funnel cannot be enabled because it is not disabled");
+ }
+ funnel.setScheduledState(ScheduledState.STOPPED);
+ }
+
+ @Override
+ public synchronized void disableFunnel(final Funnel funnel) {
+ if (funnel.getScheduledState() != ScheduledState.STOPPED) {
+ throw new IllegalStateException("Funnel cannot be disabled because its state its state is set to " + funnel.getScheduledState());
+ }
+ funnel.setScheduledState(ScheduledState.DISABLED);
+ }
+
+ @Override
+ public synchronized void disablePort(final Port port) {
+ if (port.getScheduledState() != ScheduledState.STOPPED) {
+ throw new IllegalStateException("Port cannot be disabled because its state is set to " + port.getScheduledState());
+ }
+
+ if (!(port instanceof AbstractPort)) {
+ throw new IllegalArgumentException();
+ }
+
+ ((AbstractPort) port).disable();
+ }
+
+ @Override
- public synchronized void disableProcessor(final ProcessorNode procNode) {
- if (procNode.getScheduledState() != ScheduledState.STOPPED) {
- throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
- }
- procNode.setScheduledState(ScheduledState.DISABLED);
- }
-
- @Override
+ public synchronized void enablePort(final Port port) {
+ if (port.getScheduledState() != ScheduledState.DISABLED) {
+ throw new IllegalStateException("Funnel cannot be enabled because it is not disabled");
+ }
+
+ if (!(port instanceof AbstractPort)) {
+ throw new IllegalArgumentException();
+ }
+
+ ((AbstractPort) port).enable();
+ }
+
+ @Override
+ public synchronized void enableProcessor(final ProcessorNode procNode) {
+ if (procNode.getScheduledState() != ScheduledState.DISABLED) {
+ throw new IllegalStateException("Processor cannot be enabled because it is not disabled");
+ }
++
+ procNode.setScheduledState(ScheduledState.STOPPED);
++
++ try (final NarCloseable x = NarCloseable.withNarLoader()) {
++ final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, procNode.getProcessor(), processorLog);
++ }
+ }
+
+ @Override
++ public synchronized void disableProcessor(final ProcessorNode procNode) {
++ if (procNode.getScheduledState() != ScheduledState.STOPPED) {
++ throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
++ }
++
++ procNode.setScheduledState(ScheduledState.DISABLED);
++
++ try (final NarCloseable x = NarCloseable.withNarLoader()) {
++ final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, procNode.getProcessor(), processorLog);
++ }
++ }
++
++ public synchronized void enableReportingTask(final ReportingTaskNode taskNode) {
++ if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
++ throw new IllegalStateException("Reporting Task cannot be enabled because it is not disabled");
++ }
++
++ taskNode.setScheduledState(ScheduledState.STOPPED);
++
++ try (final NarCloseable x = NarCloseable.withNarLoader()) {
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, taskNode.getReportingTask());
++ }
++ }
++
++ public synchronized void disableReportingTask(final ReportingTaskNode taskNode) {
++ if ( taskNode.getScheduledState() != ScheduledState.STOPPED ) {
++ throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState() + " but transition to DISABLED state is allowed only from the STOPPED state");
++ }
++
++ taskNode.setScheduledState(ScheduledState.DISABLED);
++
++ try (final NarCloseable x = NarCloseable.withNarLoader()) {
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, taskNode.getReportingTask());
++ }
++ }
++
++ public synchronized void enableControllerService(final ControllerServiceNode serviceNode) {
++ if ( !serviceNode.isDisabled() ) {
++ throw new IllegalStateException("Controller Service cannot be enabled because it is not disabled");
++ }
++
++ // we set the service to enabled before invoking the @OnEnabled methods. We do this because it must be
++ // done in this order for disabling (serviceNode.setDisabled(true) will throw Exceptions if the service
++ // is currently known to be in use) and we want to be consistent with the ordering of calling setDisabled
++ // before annotated methods.
++ serviceNode.setDisabled(false);
++
++ try (final NarCloseable x = NarCloseable.withNarLoader()) {
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation());
++ }
++ }
++
++ public synchronized void disableControllerService(final ControllerServiceNode serviceNode) {
++ if ( serviceNode.isDisabled() ) {
++ throw new IllegalStateException("Controller Service cannot be disabled because it is already disabled");
++ }
++
++ // We must set the service to disabled before we invoke the OnDisabled methods because the service node
++ // can throw Exceptions if we attempt to disable the service while it's known to be in use.
++ serviceNode.setDisabled(true);
++
++ try (final NarCloseable x = NarCloseable.withNarLoader()) {
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation());
++ }
++ }
++
++
++ @Override
+ public boolean isScheduled(final Object scheduled) {
+ final ScheduleState scheduleState = scheduleStates.get(scheduled);
+ return (scheduleState == null) ? false : scheduleState.isScheduled();
+ }
+
+ /**
- * Returns the ScheduleState that is registered for the given ProcessorNode;
++ * Returns the ScheduleState that is registered for the given component;
+ * if no ScheduleState current is registered, one is created and registered
+ * atomically, and then that value is returned.
+ *
+ * @param schedulable
+ * @return
+ */
+ private ScheduleState getScheduleState(final Object schedulable) {
+ ScheduleState scheduleState = scheduleStates.get(schedulable);
+ if (scheduleState == null) {
+ scheduleState = new ScheduleState();
+ ScheduleState previous = scheduleStates.putIfAbsent(schedulable, scheduleState);
+ if (previous != null) {
+ scheduleState = previous;
+ }
+ }
+ return scheduleState;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 0000000,42bd55f..9fec307
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@@ -1,0 -1,156 +1,154 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.controller.service;
+
+ import java.io.BufferedInputStream;
+ import java.io.File;
+ import java.io.IOException;
+ import java.io.InputStream;
+ import java.net.URL;
+ import java.nio.file.Files;
+ import java.nio.file.Path;
+ import java.nio.file.StandardOpenOption;
+ import java.util.ArrayList;
+ import java.util.HashMap;
+ import java.util.List;
+ import java.util.Map;
+
+ import javax.xml.XMLConstants;
+ import javax.xml.parsers.DocumentBuilder;
+ import javax.xml.parsers.DocumentBuilderFactory;
+ import javax.xml.parsers.ParserConfigurationException;
+ import javax.xml.validation.Schema;
+ import javax.xml.validation.SchemaFactory;
+
+ import org.apache.nifi.util.file.FileUtils;
+ import org.apache.nifi.util.DomUtils;
+
+ import org.apache.commons.logging.Log;
+ import org.apache.commons.logging.LogFactory;
+ import org.w3c.dom.Document;
+ import org.w3c.dom.Element;
+ import org.w3c.dom.NodeList;
+ import org.xml.sax.SAXException;
+ import org.xml.sax.SAXParseException;
+
+ /**
+ *
+ */
+ public class ControllerServiceLoader {
+
+ private static final Log logger = LogFactory.getLog(ControllerServiceLoader.class);
+
+ private final Path serviceConfigXmlPath;
+
+ public ControllerServiceLoader(final Path serviceConfigXmlPath) throws IOException {
+ final File serviceConfigXmlFile = serviceConfigXmlPath.toFile();
+ if (!serviceConfigXmlFile.exists() || !serviceConfigXmlFile.canRead()) {
+ throw new IOException(serviceConfigXmlPath + " does not appear to exist or cannot be read. Cannot load configuration.");
+ }
+
+ this.serviceConfigXmlPath = serviceConfigXmlPath;
+ }
+
+ public List<ControllerServiceNode> loadControllerServices(final ControllerServiceProvider provider) throws IOException {
+ final SchemaFactory schemaFactory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
+ InputStream fis = null;
+ BufferedInputStream bis = null;
+ documentBuilderFactory.setNamespaceAware(true);
+
+ final List<ControllerServiceNode> services = new ArrayList<>();
+
+ try {
+ final URL configurationResource = this.getClass().getResource("/ControllerServiceConfiguration.xsd");
+ if (configurationResource == null) {
+ throw new NullPointerException("Unable to load XML Schema for ControllerServiceConfiguration");
+ }
+ final Schema schema = schemaFactory.newSchema(configurationResource);
+ documentBuilderFactory.setSchema(schema);
+ final DocumentBuilder builder = documentBuilderFactory.newDocumentBuilder();
+
+ builder.setErrorHandler(new org.xml.sax.ErrorHandler() {
+
+ @Override
+ public void fatalError(final SAXParseException err) throws SAXException {
+ logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage());
+ if (logger.isDebugEnabled()) {
+ logger.error("Error Stack Dump", err);
+ }
+ throw err;
+ }
+
+ @Override
+ public void error(final SAXParseException err) throws SAXParseException {
+ logger.error("Config file line " + err.getLineNumber() + ", col " + err.getColumnNumber() + ", uri " + err.getSystemId() + " :message: " + err.getMessage());
+ if (logger.isDebugEnabled()) {
+ logger.error("Error Stack Dump", err);
+ }
+ throw err;
+ }
+
+ @Override
+ public void warning(final SAXParseException err) throws SAXParseException {
+ logger.warn(" Config file line " + err.getLineNumber() + ", uri " + err.getSystemId() + " : message : " + err.getMessage());
+ if (logger.isDebugEnabled()) {
+ logger.warn("Warning stack dump", err);
+ }
+ throw err;
+ }
+ });
+
+ //if controllerService.xml does not exist, create an empty file...
+ fis = Files.newInputStream(this.serviceConfigXmlPath, StandardOpenOption.READ);
+ bis = new BufferedInputStream(fis);
+ if (Files.size(this.serviceConfigXmlPath) > 0) {
+ final Document document = builder.parse(bis);
+ final NodeList servicesNodes = document.getElementsByTagName("services");
+ final Element servicesElement = (Element) servicesNodes.item(0);
+
+ final List<Element> serviceNodes = DomUtils.getChildElementsByTagName(servicesElement, "service");
+ for (final Element serviceElement : serviceNodes) {
- //add global properties common to all tasks
- Map<String, String> properties = new HashMap<>();
-
+ //get properties for the specific controller task - id, name, class,
+ //and schedulingPeriod must be set
+ final String serviceId = DomUtils.getChild(serviceElement, "identifier").getTextContent().trim();
+ final String serviceClass = DomUtils.getChild(serviceElement, "class").getTextContent().trim();
+
++ //set the class to be used for the configured controller task
++ final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, false);
++
+ //optional task-specific properties
+ for (final Element optionalProperty : DomUtils.getChildElementsByTagName(serviceElement, "property")) {
+ final String name = optionalProperty.getAttribute("name").trim();
+ final String value = optionalProperty.getTextContent().trim();
- properties.put(name, value);
++ serviceNode.setProperty(name, value);
+ }
+
- //set the class to be used for the configured controller task
- final ControllerServiceNode serviceNode = provider.createControllerService(serviceClass, serviceId, properties);
+ services.add(serviceNode);
+ serviceNode.setDisabled(false);
+ }
+ }
+ } catch (SAXException | ParserConfigurationException sxe) {
+ throw new IOException(sxe);
+ } finally {
+ FileUtils.closeQuietly(fis);
+ FileUtils.closeQuietly(bis);
+ }
+
+ return services;
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 0000000,455eac1..4681293
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@@ -1,0 -1,125 +1,195 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.controller.service;
+
+ import java.util.HashSet;
+ import java.util.Set;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ import java.util.concurrent.atomic.AtomicReference;
+ import java.util.concurrent.locks.Lock;
+ import java.util.concurrent.locks.ReadWriteLock;
+ import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+ import org.apache.nifi.controller.AbstractConfiguredComponent;
+ import org.apache.nifi.controller.Availability;
++import org.apache.nifi.controller.ConfigurationContext;
+ import org.apache.nifi.controller.ConfiguredComponent;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.ValidationContextFactory;
++import org.apache.nifi.controller.annotation.OnConfigured;
++import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
++import org.apache.nifi.nar.NarCloseable;
++import org.apache.nifi.util.ReflectionUtils;
+
+ public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
+
- private final ControllerService controllerService;
++ private final ControllerService proxedControllerService;
++ private final ControllerService implementation;
++ private final ControllerServiceProvider serviceProvider;
+
+ private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
+ private final AtomicBoolean disabled = new AtomicBoolean(true);
+
+ private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ private final Lock readLock = rwLock.readLock();
+ private final Lock writeLock = rwLock.writeLock();
+
+ private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
+
- public StandardControllerServiceNode(final ControllerService controllerService, final String id,
++ public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
+ final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
- super(controllerService, id, validationContextFactory, serviceProvider);
- this.controllerService = controllerService;
++ super(proxiedControllerService, id, validationContextFactory, serviceProvider);
++ this.proxedControllerService = proxiedControllerService;
++ this.implementation = implementation;
++ this.serviceProvider = serviceProvider;
+ }
+
+ @Override
+ public boolean isDisabled() {
+ return disabled.get();
+ }
+
+ @Override
+ public void setDisabled(final boolean disabled) {
+ if (!disabled && !isValid()) {
- throw new IllegalStateException("Cannot enable Controller Service " + controllerService + " because it is not valid");
++ throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid");
+ }
+
+ if (disabled) {
+ // do not allow a Controller Service to be disabled if it's currently being used.
+ final Set<ConfiguredComponent> runningRefs = getReferences().getRunningReferences();
+ if (!runningRefs.isEmpty()) {
+ throw new IllegalStateException("Cannot disable Controller Service because it is referenced (either directly or indirectly) by " + runningRefs.size() + " different components that are currently running");
+ }
+ }
+
+ this.disabled.set(disabled);
+ }
+
+ @Override
+ public Availability getAvailability() {
+ return availability.get();
+ }
+
+ @Override
+ public void setAvailability(final Availability availability) {
+ this.availability.set(availability);
+ }
+
+ @Override
- public ControllerService getControllerService() {
- return controllerService;
++ public ControllerService getProxiedControllerService() {
++ return proxedControllerService;
++ }
++
++ @Override
++ public ControllerService getControllerServiceImplementation() {
++ return implementation;
+ }
+
+ @Override
+ public ControllerServiceReference getReferences() {
+ readLock.lock();
+ try {
+ return new StandardControllerServiceReference(this, referencingComponents);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ @Override
+ public void addReference(final ConfiguredComponent referencingComponent) {
+ writeLock.lock();
+ try {
+ referencingComponents.add(referencingComponent);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void removeReference(final ConfiguredComponent referencingComponent) {
+ writeLock.lock();
+ try {
+ referencingComponents.remove(referencingComponent);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void verifyModifiable() throws IllegalStateException {
+ if (!isDisabled()) {
+ throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
+ }
+ }
++
++ @Override
++ public void setProperty(final String name, final String value) {
++ super.setProperty(name, value);
++
++ onConfigured();
++ }
++
++ @Override
++ public boolean removeProperty(String name) {
++ final boolean removed = super.removeProperty(name);
++ if ( removed ) {
++ onConfigured();
++ }
++
++ return removed;
++ }
++
++ private void onConfigured() {
++ try (final NarCloseable x = NarCloseable.withNarLoader()) {
++ final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider);
++ ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, implementation, configContext);
++ } catch (final Exception e) {
++ throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e);
++ }
++ }
++
++ @Override
++ public void verifyCanDelete() {
++ if ( !isDisabled() ) {
++ throw new IllegalStateException(implementation + " cannot be deleted because it is not disabled");
++ }
++ }
++
++ @Override
++ public void verifyCanDisable() {
++ final ControllerServiceReference references = getReferences();
++ final int numRunning = references.getRunningReferences().size();
++ if ( numRunning > 0 ) {
++ throw new IllegalStateException(implementation + " cannot be disabled because it is referenced by " + numRunning + " components that are currently running");
++ }
++ }
++
++ @Override
++ public void verifyCanEnable() {
++ if ( !isDisabled() ) {
++ throw new IllegalStateException(implementation + " cannot be enabled because it is not disabled");
++ }
++ }
++
++ @Override
++ public void verifyCanUpdate() {
++ if ( !isDisabled() ) {
++ throw new IllegalStateException(implementation + " cannot be updated because it is not disabled");
++ }
++ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index 0000000,fc07ce1..cc7a18a
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@@ -1,0 -1,196 +1,219 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.controller.service;
+
+ import static java.util.Objects.requireNonNull;
+
+ import java.lang.reflect.InvocationHandler;
+ import java.lang.reflect.InvocationTargetException;
+ import java.lang.reflect.Method;
+ import java.lang.reflect.Proxy;
+ import java.util.ArrayList;
+ import java.util.Collections;
+ import java.util.HashSet;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ import java.util.concurrent.ConcurrentHashMap;
+
++import org.apache.nifi.annotation.lifecycle.OnAdded;
++import org.apache.nifi.annotation.lifecycle.OnRemoved;
++import org.apache.nifi.controller.ConfigurationContext;
+ import org.apache.nifi.controller.ControllerService;
+ import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.controller.annotation.OnConfigured;
+ import org.apache.nifi.controller.exception.ControllerServiceAlreadyExistsException;
+ import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
++import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+ import org.apache.nifi.nar.ExtensionManager;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.processor.StandardValidationContextFactory;
+ import org.apache.nifi.util.ObjectHolder;
+ import org.apache.nifi.util.ReflectionUtils;
-
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ /**
+ *
+ */
+ public class StandardControllerServiceProvider implements ControllerServiceProvider {
+
+ private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
+
+ private final Map<String, ControllerServiceNode> controllerServices;
+ private static final Set<Method> validDisabledMethods;
+
+ static {
+ // methods that are okay to be called when the service is disabled.
+ final Set<Method> validMethods = new HashSet<>();
+ for (final Method method : ControllerService.class.getMethods()) {
+ validMethods.add(method);
+ }
+ for (final Method method : Object.class.getMethods()) {
+ validMethods.add(method);
+ }
+ validDisabledMethods = Collections.unmodifiableSet(validMethods);
+ }
+
+ public StandardControllerServiceProvider() {
+ // the following 2 maps must be updated atomically, but we do not lock around them because they are modified
+ // only in the createControllerService method, and both are modified before the method returns
+ this.controllerServices = new ConcurrentHashMap<>();
+ }
+
+ private Class<?>[] getInterfaces(final Class<?> cls) {
+ final List<Class<?>> allIfcs = new ArrayList<>();
+ populateInterfaces(cls, allIfcs);
+ return allIfcs.toArray(new Class<?>[allIfcs.size()]);
+ }
+
+ private void populateInterfaces(final Class<?> cls, final List<Class<?>> interfacesDefinedThusFar) {
+ final Class<?>[] ifc = cls.getInterfaces();
+ if (ifc != null && ifc.length > 0) {
+ for (final Class<?> i : ifc) {
+ interfacesDefinedThusFar.add(i);
+ }
+ }
+
+ final Class<?> superClass = cls.getSuperclass();
+ if (superClass != null) {
+ populateInterfaces(superClass, interfacesDefinedThusFar);
+ }
+ }
+
+ @Override
- public ControllerServiceNode createControllerService(final String type, final String id, final Map<String, String> properties) {
++ public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
+ if (type == null || id == null) {
+ throw new NullPointerException();
+ }
+ if (controllerServices.containsKey(id)) {
+ throw new ControllerServiceAlreadyExistsException(id);
+ }
+
+ final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
+ try {
+ final ClassLoader cl = ExtensionManager.getClassLoader(type);
+ Thread.currentThread().setContextClassLoader(cl);
+ final Class<?> rawClass = Class.forName(type, false, cl);
+ final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
+
+ final ControllerService originalService = controllerServiceClass.newInstance();
+ final ObjectHolder<ControllerServiceNode> serviceNodeHolder = new ObjectHolder<>(null);
+ final InvocationHandler invocationHandler = new InvocationHandler() {
+ @Override
+ public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
+ final ControllerServiceNode node = serviceNodeHolder.get();
+ if (node.isDisabled() && !validDisabledMethods.contains(method)) {
++ // Use nar class loader here because we are implicitly calling toString() on the original implementation.
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service " + originalService + " because the Controller Service is disabled");
+ } catch (final Throwable e) {
+ throw new IllegalStateException("Cannot invoke method " + method + " on Controller Service with identifier " + id + " because the Controller Service is disabled");
+ }
+ }
+
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ return method.invoke(originalService, args);
+ } catch (final InvocationTargetException e) {
+ // If the ControllerService throws an Exception, it'll be wrapped in an InvocationTargetException. We want
+ // to instead re-throw what the ControllerService threw, so we pull it out of the InvocationTargetException.
+ throw e.getCause();
+ }
+ }
+ };
+
+ final ControllerService proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
+ logger.info("Loaded service {} as configured.", type);
+
+ originalService.initialize(new StandardControllerServiceInitializationContext(id, this));
+
+ final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
+
- final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, id, validationContextFactory, this);
++ final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this);
+ serviceNodeHolder.set(serviceNode);
+ serviceNode.setAnnotationData(null);
+ serviceNode.setName(id);
- for (final Map.Entry<String, String> entry : properties.entrySet()) {
- serviceNode.setProperty(entry.getKey(), entry.getValue());
++
++ if ( firstTimeAdded ) {
++ try (final NarCloseable x = NarCloseable.withNarLoader()) {
++ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService);
++ } catch (final Exception e) {
++ throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + originalService, e);
++ }
+ }
- final StandardConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigured.class, originalService, configurationContext);
+
+ this.controllerServices.put(id, serviceNode);
+ return serviceNode;
+ } catch (final Throwable t) {
+ throw new ControllerServiceNotFoundException(t);
+ } finally {
+ if (currentContextClassLoader != null) {
+ Thread.currentThread().setContextClassLoader(currentContextClassLoader);
+ }
+ }
+ }
+
+ @Override
+ public ControllerService getControllerService(final String serviceIdentifier) {
+ final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
- return (node == null) ? null : node.getControllerService();
++ return (node == null) ? null : node.getProxiedControllerService();
+ }
+
+ @Override
+ public boolean isControllerServiceEnabled(final ControllerService service) {
+ return isControllerServiceEnabled(service.getIdentifier());
+ }
+
+ @Override
+ public boolean isControllerServiceEnabled(final String serviceIdentifier) {
+ final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
+ return (node == null) ? false : !node.isDisabled();
+ }
+
+ @Override
+ public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
+ return controllerServices.get(serviceIdentifier);
+ }
+
+ @Override
+ public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
+ final Set<String> identifiers = new HashSet<>();
+ for (final Map.Entry<String, ControllerServiceNode> entry : controllerServices.entrySet()) {
- if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getControllerService().getClass())) {
++ if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getProxiedControllerService().getClass())) {
+ identifiers.add(entry.getKey());
+ }
+ }
+
+ return identifiers;
+ }
++
++ @Override
++ public void removeControllerService(final ControllerServiceNode serviceNode) {
++ final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier());
++ if ( existing == null || existing != serviceNode ) {
++ throw new IllegalStateException("Controller Service " + serviceNode + " does not exist in this Flow");
++ }
++
++ serviceNode.verifyCanDelete();
++
++ try (final NarCloseable x = NarCloseable.withNarLoader()) {
++ final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext);
++ }
++
++ controllerServices.remove(serviceNode.getIdentifier());
++ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
index 0000000,c04a04f..aca870b
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
@@@ -1,0 -1,97 +1,97 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.controller.tasks;
+
+ import java.util.concurrent.atomic.AtomicLong;
+
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.connectable.Connectable;
+ import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
+ import org.apache.nifi.controller.scheduling.ConnectableProcessContext;
+ import org.apache.nifi.controller.scheduling.ProcessContextFactory;
+ import org.apache.nifi.controller.scheduling.ScheduleState;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.processor.ProcessSessionFactory;
-import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.util.Connectables;
+ import org.apache.nifi.util.ReflectionUtils;
-
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ContinuallyRunConnectableTask implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunConnectableTask.class);
+
+ private final Connectable connectable;
+ private final ScheduleState scheduleState;
+ private final ProcessSessionFactory sessionFactory;
+ private final ConnectableProcessContext processContext;
+
+ public ContinuallyRunConnectableTask(final ProcessContextFactory contextFactory, final Connectable connectable, final ScheduleState scheduleState, final StringEncryptor encryptor) {
+ this.connectable = connectable;
+ this.scheduleState = scheduleState;
+ this.sessionFactory = new StandardProcessSessionFactory(contextFactory.newProcessContext(connectable, new AtomicLong(0L)));
+ this.processContext = new ConnectableProcessContext(connectable, encryptor);
+ }
+
++ @SuppressWarnings("deprecation")
+ @Override
+ public void run() {
+ if (!scheduleState.isScheduled()) {
+ return;
+ }
+ // Connectable should run if the following conditions are met:
+ // 1. It's an Input Port or or is a Remote Input Port or has incoming FlowFiles queued
+ // 2. Any relationship is available (since there's only 1
+ // relationship for a Connectable, we can say "any" or "all" and
+ // it means the same thing)
+ // 3. It is not yielded.
+ final boolean triggerWhenEmpty = connectable.isTriggerWhenEmpty();
+ final boolean shouldRun = (connectable.getYieldExpiration() < System.currentTimeMillis())
+ && (triggerWhenEmpty || Connectables.flowFilesQueued(connectable)) && (connectable.getRelationships().isEmpty() || Connectables.anyRelationshipAvailable(connectable));
+
+ if (shouldRun) {
+ scheduleState.incrementActiveThreadCount();
+ try {
+ try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
+ connectable.onTrigger(processContext, sessionFactory);
+ } catch (final ProcessException pe) {
+ logger.error("{} failed to process session due to {}", connectable, pe.toString());
+ } catch (final Throwable t) {
+ logger.error("{} failed to process session due to {}", connectable, t.toString());
+ logger.error("", t);
+
+ logger.warn("{} Administratively Pausing for 10 seconds due to processing failure: {}", connectable, t.toString(), t);
+ try {
+ Thread.sleep(10000L);
+ } catch (final InterruptedException e) {
+ }
+
+ }
+ } finally {
+ if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext);
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, connectable, processContext);
+ }
+ }
+
+ scheduleState.decrementActiveThreadCount();
+ }
+ }
+ }
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index 0000000,65c375f..33bd327
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@@ -1,0 -1,185 +1,185 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.controller.tasks;
+
+ import java.io.IOException;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicLong;
+
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.controller.FlowController;
+ import org.apache.nifi.controller.ProcessorNode;
+ import org.apache.nifi.controller.repository.BatchingSessionFactory;
+ import org.apache.nifi.controller.repository.ProcessContext;
+ import org.apache.nifi.controller.repository.StandardFlowFileEvent;
+ import org.apache.nifi.controller.repository.StandardProcessSession;
+ import org.apache.nifi.controller.repository.StandardProcessSessionFactory;
+ import org.apache.nifi.controller.scheduling.ProcessContextFactory;
+ import org.apache.nifi.controller.scheduling.ScheduleState;
+ import org.apache.nifi.controller.scheduling.SchedulingAgent;
+ import org.apache.nifi.encrypt.StringEncryptor;
+ import org.apache.nifi.logging.ProcessorLog;
+ import org.apache.nifi.nar.NarCloseable;
+ import org.apache.nifi.processor.ProcessSessionFactory;
+ import org.apache.nifi.processor.SimpleProcessLogger;
+ import org.apache.nifi.processor.StandardProcessContext;
-import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.processor.exception.ProcessException;
+ import org.apache.nifi.util.Connectables;
+ import org.apache.nifi.util.ReflectionUtils;
-
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ContinuallyRunProcessorTask implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(ContinuallyRunProcessorTask.class);
+
+ private final SchedulingAgent schedulingAgent;
+ private final ProcessorNode procNode;
+ private final ProcessContext context;
+ private final ScheduleState scheduleState;
+ private final StandardProcessContext processContext;
+ private final FlowController flowController;
+ private final int numRelationships;
+
+ public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, final ProcessorNode procNode,
+ final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState, final StringEncryptor encryptor) {
+
+ this.schedulingAgent = schedulingAgent;
+ this.procNode = procNode;
+ this.scheduleState = scheduleState;
+ this.numRelationships = procNode.getRelationships().size();
+ this.flowController = flowController;
+
+ context = contextFactory.newProcessContext(procNode, new AtomicLong(0L));
+ this.processContext = new StandardProcessContext(procNode, flowController, encryptor);
+ }
+
++ @SuppressWarnings("deprecation")
+ @Override
+ public void run() {
+ // make sure processor is not yielded
+ boolean shouldRun = (procNode.getYieldExpiration() < System.currentTimeMillis());
+ if (!shouldRun) {
+ return;
+ }
+
+ // make sure that either we're not clustered or this processor runs on all nodes or that this is the primary node
+ shouldRun = !procNode.isIsolated() || !flowController.isClustered() || flowController.isPrimary();
+ if (!shouldRun) {
+ return;
+ }
+
+ // make sure that either proc has incoming FlowFiles or has no incoming connections or is annotated with @TriggerWhenEmpty
+ shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
+ if (!shouldRun) {
+ return;
+ }
+
+ if (numRelationships > 0) {
+ final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
+ shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);
+ }
+
+ final long batchNanos = procNode.getRunDuration(TimeUnit.NANOSECONDS);
+ final ProcessSessionFactory sessionFactory;
+ final StandardProcessSession rawSession;
+ final boolean batch;
+ if (procNode.isHighThroughputSupported() && batchNanos > 0L) {
+ rawSession = new StandardProcessSession(context);
+ sessionFactory = new BatchingSessionFactory(rawSession);
+ batch = true;
+ } else {
+ rawSession = null;
+ sessionFactory = new StandardProcessSessionFactory(context);
+ batch = false;
+ }
+
+ if (!shouldRun) {
+ return;
+ }
+
+ scheduleState.incrementActiveThreadCount();
+
+ final long startNanos = System.nanoTime();
+ final long finishNanos = startNanos + batchNanos;
+ int invocationCount = 0;
+ try {
+ try (final AutoCloseable ncl = NarCloseable.withNarLoader()) {
+ while (shouldRun) {
+ procNode.onTrigger(processContext, sessionFactory);
+ invocationCount++;
+
+ if (!batch) {
+ return;
+ }
+
+ if (System.nanoTime() > finishNanos) {
+ return;
+ }
+
+ shouldRun = procNode.isTriggerWhenEmpty() || !procNode.hasIncomingConnection() || Connectables.flowFilesQueued(procNode);
+ shouldRun = shouldRun && (procNode.getYieldExpiration() < System.currentTimeMillis());
+
+ if (shouldRun && numRelationships > 0) {
+ final int requiredNumberOfAvailableRelationships = procNode.isTriggerWhenAnyDestinationAvailable() ? 1 : numRelationships;
+ shouldRun = context.isRelationshipAvailabilitySatisfied(requiredNumberOfAvailableRelationships);
+ }
+ }
+ } catch (final ProcessException pe) {
+ final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+ procLog.error("Failed to process session due to {}", new Object[]{pe});
+ } catch (final Throwable t) {
+ // Use ProcessorLog to log the event so that a bulletin will be created for this processor
+ final ProcessorLog procLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+ procLog.error("{} failed to process session due to {}", new Object[]{procNode.getProcessor(), t});
+ procLog.warn("Processor Administratively Yielded for {} due to processing failure", new Object[]{schedulingAgent.getAdministrativeYieldDuration()});
+ logger.warn("Administratively Yielding {} due to uncaught Exception: {}", procNode.getProcessor(), t.toString());
+ logger.warn("", t);
+
+ procNode.yield(schedulingAgent.getAdministrativeYieldDuration(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS);
+ }
+ } finally {
+ if (batch) {
+ rawSession.commit();
+ }
+
+ final long processingNanos = System.nanoTime() - startNanos;
+
+ // if the processor is no longer scheduled to run and this is the last thread,
+ // invoke the OnStopped methods
+ if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, procNode.getProcessor(), processContext);
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, procNode.getProcessor(), processContext);
+ flowController.heartbeat();
+ }
+ }
+
+ scheduleState.decrementActiveThreadCount();
+
+ try {
+ final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier());
+ procEvent.setProcessingNanos(processingNanos);
+ procEvent.setInvocations(invocationCount);
+ context.getFlowFileEventRepository().updateRepository(procEvent);
+ } catch (final IOException e) {
+ logger.error("Unable to update FlowFileEvent Repository for {}; statistics may be inaccurate. Reason for failure: {}", procNode.getProcessor(), e.toString());
+ logger.error("", e);
+ }
+ }
+ }
+
+ }
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/716e03b5/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
----------------------------------------------------------------------
diff --cc nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
index 0000000,36aa9dd..9b70581
mode 000000,100644..100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
@@@ -1,0 -1,63 +1,63 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ package org.apache.nifi.controller.tasks;
+
++import org.apache.nifi.annotation.lifecycle.OnStopped;
+ import org.apache.nifi.controller.ReportingTaskNode;
+ import org.apache.nifi.controller.scheduling.ScheduleState;
+ import org.apache.nifi.nar.NarCloseable;
-import org.apache.nifi.processor.annotation.OnStopped;
+ import org.apache.nifi.util.ReflectionUtils;
-
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ public class ReportingTaskWrapper implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(ReportingTaskWrapper.class);
+
+ private final ReportingTaskNode taskNode;
+ private final ScheduleState scheduleState;
+
+ public ReportingTaskWrapper(final ReportingTaskNode taskNode, final ScheduleState scheduleState) {
+ this.taskNode = taskNode;
+ this.scheduleState = scheduleState;
+ }
+
++ @SuppressWarnings("deprecation")
+ @Override
+ public synchronized void run() {
+ scheduleState.incrementActiveThreadCount();
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ taskNode.getReportingTask().onTrigger(taskNode.getReportingContext());
+ } catch (final Throwable t) {
+ logger.error("Error running task {} due to {}", taskNode.getReportingTask(), t.toString());
+ if (logger.isDebugEnabled()) {
+ logger.error("", t);
+ }
+ } finally {
+ // if the processor is no longer scheduled to run and this is the last thread,
+ // invoke the OnStopped methods
+ if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, taskNode.getReportingTask(), taskNode.getReportingContext());
++ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, taskNode.getReportingTask(), taskNode.getReportingContext());
+ }
+ }
+
+ scheduleState.decrementActiveThreadCount();
+ }
+ }
+
+ }