You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/19 02:26:17 UTC
[1/6] incubator-nifi git commit: NIFI-6: Deprecated @OnConfigured
annotation in favor of those in the org.apache.nifi.annotation.lifecycle
package
Repository: incubator-nifi
Updated Branches:
refs/heads/annotations [created] 850396cc9
NIFI-6: Deprecated @OnConfigured annotation in favor of those in the org.apache.nifi.annotation.lifecycle package
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/6b5d1a86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/6b5d1a86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/6b5d1a86
Branch: refs/heads/annotations
Commit: 6b5d1a86bec2d1d56bc44c2e400820feb4b42e2a
Parents: 0f31032
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 16 12:58:37 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 16 12:58:37 2015 -0500
----------------------------------------------------------------------
.../java/org/apache/nifi/controller/annotation/OnConfigured.java | 3 +++
1 file changed, 3 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6b5d1a86/nifi/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java
index 70f2c60..78cc04b 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java
@@ -31,11 +31,14 @@ import java.lang.annotation.Target;
* {@link nifi.controller.ConfigurationContext ConfigurationContext}.
*
* @author none
+ *
+ * @deprecated This annotation has been replaced by those in the {@link org.apache.nifi.annotation.lifecycle} package.
*/
@Documented
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
+@Deprecated
public @interface OnConfigured {
}
[2/6] incubator-nifi git commit: NIFI-4: Fixed documentation of
OnScheduled and OnUnscheduled. Updated StandardProcessScheduler to invoke
OnScheduled, OnUnscheduled, OnStopped methods appropriately.
Posted by ma...@apache.org.
NIFI-4: Fixed documentation of OnScheduled and OnUnscheduled. Updated StandardProcessScheduler to invoke OnScheduled, OnUnscheduled, OnStopped methods appropriately.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/68707ce3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/68707ce3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/68707ce3
Branch: refs/heads/annotations
Commit: 68707ce3c43f96e6a26789686c8f5bc397c6a532
Parents: 6b5d1a8
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 16 13:35:17 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 16 13:35:17 2015 -0500
----------------------------------------------------------------------
.../scheduling/StandardProcessScheduler.java | 50 ++++++++++----------
.../nifi/annotation/lifecycle/OnScheduled.java | 2 +-
.../annotation/lifecycle/OnUnscheduled.java | 14 ++++++
3 files changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/68707ce3/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 5950b4e..e565ebc 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -161,18 +161,21 @@ public final class StandardProcessScheduler implements ProcessScheduler {
scheduleState.setScheduled(true);
final Runnable startReportingTaskRunnable = new Runnable() {
+ @SuppressWarnings("deprecation")
@Override
public void run() {
+ // Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time.
while (true) {
final ReportingTask reportingTask = taskNode.getReportingTask();
try {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, taskNode.getConfigurationContext());
+ ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
}
+
break;
} catch (final InvocationTargetException ite) {
- LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+ LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
LOG.error("", ite.getTargetException());
@@ -181,7 +184,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
} catch (final InterruptedException ie) {
}
} catch (final Exception e) {
- LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+ LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
try {
Thread.sleep(administrativeYieldMillis);
@@ -213,34 +216,31 @@ public final class StandardProcessScheduler implements ProcessScheduler {
public void run() {
final ConfigurationContext configurationContext = taskNode.getConfigurationContext();
- while (true) {
- try {
- try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
- }
- break;
- } catch (final InvocationTargetException ite) {
- LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
- new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
- LOG.error("", ite.getTargetException());
+ try {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
+ }
+ } catch (final InvocationTargetException ite) {
+ LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+ new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
+ LOG.error("", ite.getTargetException());
- try {
- Thread.sleep(administrativeYieldMillis);
- } catch (final InterruptedException ie) {
- }
- } catch (final Exception e) {
- LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
- new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
- try {
- Thread.sleep(administrativeYieldMillis);
- } catch (final InterruptedException ie) {
- }
+ try {
+ Thread.sleep(administrativeYieldMillis);
+ } catch (final InterruptedException ie) {
+ }
+ } catch (final Exception e) {
+ LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+ new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
+ try {
+ Thread.sleep(administrativeYieldMillis);
+ } catch (final InterruptedException ie) {
}
}
agent.unschedule(taskNode, scheduleState);
- if (scheduleState.getActiveThreadCount() == 0) {
+ if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/68707ce3/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
index 9dfd150..a0703fa 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
@@ -41,7 +41,7 @@ import java.lang.annotation.Target;
*
* <p>
* If using 1 argument and the component using the annotation is a Reporting Task, that argument must
- * be of type {@link org.apache.nifi.reporting.ReportingContext ReportingContext}.
+ * be of type {@link org.apache.nifi.controller.ConfigurationContext ConfigurationContext}.
* </p>
*
* If any method annotated with this annotation throws any Throwable, the framework will wait a while
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/68707ce3/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
index 68d0fe8..b1dbde1 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
@@ -33,6 +33,20 @@ import java.lang.annotation.Target;
* threads are potentially running. To invoke a method after all threads have
* finished processing, see the {@link OnStopped} annotation.
* </p>
+ *
+ * <p>
+ * Methods using this annotation must take either 0 arguments or a single argument.
+ * </p>
+ *
+ * <p>
+ * If using 1 argument and the component using the annotation is a Processor, that argument must
+ * be of type {@link org.apache.nifi.processor.ProcessContext ProcessContext}.
+ * </p>
+ *
+ * <p>
+ * If using 1 argument and the component using the annotation is a Reporting Task, that argument must
+ * be of type {@link org.apache.nifi.controller.ConfigurationContext ConfigurationContext}.
+ * </p>
*
* @author none
*/
[5/6] incubator-nifi git commit: NIFI-4: Added lifecycle annotation
support
Posted by ma...@apache.org.
NIFI-4: Added lifecycle annotation support
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d8e1f570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d8e1f570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d8e1f570
Branch: refs/heads/annotations
Commit: d8e1f570a68df152f1d29d60acf732a0f6b532ec
Parents: d734220
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 16 15:52:47 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 16 15:52:47 2015 -0500
----------------------------------------------------------------------
.../service/ControllerServiceProvider.java | 21 +++--
.../apache/nifi/controller/FlowController.java | 57 ++++++++++++-
.../scheduling/StandardProcessScheduler.java | 88 ++++++++++++++++++--
.../StandardControllerServiceProvider.java | 43 +++++++---
.../processor/StandardSchedulingContext.java | 4 +-
.../org/apache/nifi/util/ReflectionUtils.java | 62 ++++++++++++--
6 files changed, 238 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 35a255d..03ed779 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -16,8 +16,7 @@
*/
package org.apache.nifi.controller.service;
-import java.util.Map;
-
+import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.controller.ControllerServiceLookup;
/**
@@ -26,15 +25,15 @@ import org.apache.nifi.controller.ControllerServiceLookup;
public interface ControllerServiceProvider extends ControllerServiceLookup {
/**
- * Gets the controller service for the specified identifier. Returns null if
- * the identifier does not match a known service.
+ * Creates a new Controller Service of the given type and assigns it the given id. If <code>firstTimeadded</code>
+ * is true, calls any methods that are annotated with {@link OnAdded}
*
* @param type
* @param id
- * @param properties
+ * @param firstTimeAdded
* @return
*/
- ControllerServiceNode createControllerService(String type, String id, Map<String, String> properties);
+ ControllerServiceNode createControllerService(String type, String id, boolean firstTimeAdded);
/**
* Gets the controller service node for the specified identifier. Returns
@@ -44,4 +43,14 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
* @return
*/
ControllerServiceNode getControllerServiceNode(String id);
+
+ /**
+ * Removes the given Controller Service from the flow. This will call all appropriate methods
+ * that have the @OnRemoved annotation.
+ *
+ * @param serviceNode the controller service to remove
+ *
+ * @throws IllegalStateException if the controller service is not disabled or is not a part of this flow
+ */
+ void removeControllerService(ControllerServiceNode serviceNode);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
index 860ea2d..1d90a3a 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -50,6 +50,7 @@ import javax.net.ssl.SSLContext;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.cluster.BulletinsPayload;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.DataFlow;
@@ -134,6 +135,7 @@ import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.logging.ProcessorLogObserver;
import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarClassLoader;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
import org.apache.nifi.processor.Processor;
@@ -2463,6 +2465,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
}
public ReportingTaskNode createReportingTask(final String type, String id) throws ReportingTaskInstantiationException {
+ return createReportingTask(type, id, true);
+ }
+
+ public ReportingTaskNode createReportingTask(final String type, String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
if (type == null) {
throw new NullPointerException();
}
@@ -2484,7 +2490,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class);
final Object reportingTaskObj = reportingTaskClass.newInstance();
task = reportingTaskClass.cast(reportingTaskObj);
-
} catch (final ClassNotFoundException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException t) {
throw new ReportingTaskInstantiationException(type, t);
} finally {
@@ -2495,6 +2500,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider);
final ReportingTaskNode taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory);
+
+ if ( firstTimeAdded ) {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
+ } catch (final Exception e) {
+ throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
+ }
+ }
+
reportingTasks.put(id, taskNode);
return taskNode;
}
@@ -2519,13 +2533,45 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
processScheduler.unschedule(reportingTaskNode);
}
+ public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
+ final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
+ if ( existing == null || existing != reportingTaskNode ) {
+ throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
+ }
+
+ reportingTaskNode.verifyCanDelete();
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
+ }
+
+ reportingTasks.remove(reportingTaskNode.getIdentifier());
+ }
+
Collection<ReportingTaskNode> getReportingTasks() {
return reportingTasks.values();
}
+
+ public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
+ processScheduler.enableReportingTask(reportingTaskNode);
+ }
+
+ public void disableReportingTask(final ReportingTaskNode reportingTaskNode) {
+ processScheduler.disableReportingTask(reportingTaskNode);
+ }
+
+ public void enableControllerService(final ControllerServiceNode serviceNode) {
+ processScheduler.enableControllerService(serviceNode);
+ }
+
+ public void disableControllerService(final ControllerServiceNode serviceNode) {
+ processScheduler.disableControllerService(serviceNode);
+ }
+
@Override
- public ControllerServiceNode createControllerService(final String type, final String id, final Map<String, String> properties) {
- return controllerServiceProvider.createControllerService(type, id.intern(), properties);
+ public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
+ return controllerServiceProvider.createControllerService(type, id.intern(), firstTimeAdded);
}
@Override
@@ -2548,6 +2594,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
return controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier);
}
+ @Override
+ public void removeControllerService(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.removeControllerService(serviceNode);
+ }
+
//
// Counters
//
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index e565ebc..0653b03 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -27,6 +27,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -41,6 +43,7 @@ import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
@@ -514,14 +517,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
@Override
- public synchronized void disableProcessor(final ProcessorNode procNode) {
- if (procNode.getScheduledState() != ScheduledState.STOPPED) {
- throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
- }
- procNode.setScheduledState(ScheduledState.DISABLED);
- }
-
- @Override
public synchronized void enablePort(final Port port) {
if (port.getScheduledState() != ScheduledState.DISABLED) {
throw new IllegalStateException("Funnel cannot be enabled because it is not disabled");
@@ -539,9 +534,84 @@ public final class StandardProcessScheduler implements ProcessScheduler {
if (procNode.getScheduledState() != ScheduledState.DISABLED) {
throw new IllegalStateException("Processor cannot be enabled because it is not disabled");
}
+
procNode.setScheduledState(ScheduledState.STOPPED);
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, procNode.getProcessor(), processorLog);
+ }
+ }
+
+ @Override
+ public synchronized void disableProcessor(final ProcessorNode procNode) {
+ if (procNode.getScheduledState() != ScheduledState.STOPPED) {
+ throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
+ }
+
+ procNode.setScheduledState(ScheduledState.DISABLED);
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, procNode.getProcessor(), processorLog);
+ }
}
+ public synchronized void enableReportingTask(final ReportingTaskNode taskNode) {
+ if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
+ throw new IllegalStateException("Reporting Task cannot be enabled because it is not disabled");
+ }
+
+ taskNode.setScheduledState(ScheduledState.STOPPED);
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, taskNode.getReportingTask());
+ }
+ }
+
+ public synchronized void disableReportingTask(final ReportingTaskNode taskNode) {
+ if ( taskNode.getScheduledState() != ScheduledState.STOPPED ) {
+ throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState() + " but transition to DISABLED state is allowed only from the STOPPED state");
+ }
+
+ taskNode.setScheduledState(ScheduledState.DISABLED);
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, taskNode.getReportingTask());
+ }
+ }
+
+ public synchronized void enableControllerService(final ControllerServiceNode serviceNode) {
+ if ( !serviceNode.isDisabled() ) {
+ throw new IllegalStateException("Controller Service cannot be enabled because it is not disabled");
+ }
+
+ // we set the service to enabled before invoking the @OnEnabled methods. We do this because it must be
+ // done in this order for disabling (serviceNode.setDisabled(true) will throw Exceptions if the service
+ // is currently known to be in use) and we want to be consistent with the ordering of calling setDisabled
+ // before annotated methods.
+ serviceNode.setDisabled(false);
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation());
+ }
+ }
+
+ public synchronized void disableControllerService(final ControllerServiceNode serviceNode) {
+ if ( serviceNode.isDisabled() ) {
+ throw new IllegalStateException("Controller Service cannot be disabled because it is already disabled");
+ }
+
+ // We must set the service to disabled before we invoke the OnDisabled methods because the service node
+ // can throw Exceptions if we attempt to disable the service while it's known to be in use.
+ serviceNode.setDisabled(true);
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation());
+ }
+ }
+
+
@Override
public boolean isScheduled(final Object scheduled) {
final ScheduleState scheduleState = scheduleStates.get(scheduled);
@@ -549,7 +619,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
}
/**
- * Returns the ScheduleState that is registered for the given ProcessorNode;
+ * Returns the ScheduleState that is registered for the given component;
* if no ScheduleState current is registered, one is created and registered
* atomically, and then that value is returned.
*
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index fc07ce1..bf0039a 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -30,17 +30,20 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.exception.ControllerServiceAlreadyExistsException;
import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
+import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.processor.StandardValidationContextFactory;
import org.apache.nifi.util.ObjectHolder;
import org.apache.nifi.util.ReflectionUtils;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -93,7 +96,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
@Override
- public ControllerServiceNode createControllerService(final String type, final String id, final Map<String, String> properties) {
+ public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
if (type == null || id == null) {
throw new NullPointerException();
}
@@ -139,15 +142,18 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
- final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, id, validationContextFactory, this);
+ final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this);
serviceNodeHolder.set(serviceNode);
serviceNode.setAnnotationData(null);
serviceNode.setName(id);
- for (final Map.Entry<String, String> entry : properties.entrySet()) {
- serviceNode.setProperty(entry.getKey(), entry.getValue());
+
+ if ( firstTimeAdded ) {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService);
+ } catch (final Exception e) {
+ throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + originalService, e);
+ }
}
- final StandardConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigured.class, originalService, configurationContext);
this.controllerServices.put(id, serviceNode);
return serviceNode;
@@ -163,7 +169,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public ControllerService getControllerService(final String serviceIdentifier) {
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
- return (node == null) ? null : node.getControllerService();
+ return (node == null) ? null : node.getProxiedControllerService();
}
@Override
@@ -186,11 +192,28 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
final Set<String> identifiers = new HashSet<>();
for (final Map.Entry<String, ControllerServiceNode> entry : controllerServices.entrySet()) {
- if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getControllerService().getClass())) {
+ if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getProxiedControllerService().getClass())) {
identifiers.add(entry.getKey());
}
}
return identifiers;
}
+
+ @Override
+ public void removeControllerService(final ControllerServiceNode serviceNode) {
+ final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier());
+ if ( existing == null || existing != serviceNode ) {
+ throw new IllegalStateException("Controller Service " + serviceNode + " does not exist in this Flow");
+ }
+
+ serviceNode.verifyCanDelete();
+
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext);
+ }
+
+ controllerServices.remove(serviceNode.getIdentifier());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 318901f..ac58504 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@ -46,11 +46,11 @@ public class StandardSchedulingContext implements SchedulingContext {
}
if (serviceNode.isDisabled()) {
- throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is currently disabled");
+ throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is currently disabled");
}
if (!serviceNode.isValid()) {
- throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is not currently valid");
+ throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is not currently valid");
}
serviceNode.addReference(processorNode);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
index f8e7da4..a8a4596 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
+import org.apache.nifi.logging.ProcessorLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,7 +149,28 @@ public class ReflectionUtils {
* is returned, an error will have been logged.
*/
public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) {
- return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, args);
+ return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, null, args);
+ }
+
+
+ /**
+ * Invokes all methods on the given instance that have been annotated with
+ * the given Annotation. If the signature of the method that is defined in
+ * <code>instance</code> uses 1 or more parameters, those parameters must be
+ * specified by the <code>args</code> parameter. However, if more arguments
+ * are supplied by the <code>args</code> parameter than needed, the extra
+ * arguments will be ignored.
+ *
+ * @param annotation
+ * @param instance
+ * @param args
+ * @return <code>true</code> if all appropriate methods were invoked and
+ * returned without throwing an Exception, <code>false</code> if one of the
+ * methods threw an Exception or could not be invoked; if <code>false</code>
+ * is returned, an error will have been logged.
+ */
+ public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final ProcessorLog logger, final Object... args) {
+ return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, logger, args);
}
@@ -165,13 +187,15 @@ public class ReflectionUtils {
* @param preferredAnnotation
* @param alternateAnnotation
* @param instance
+ * @param logger the ProcessorLog to use for logging any errors. If null, will use own logger, but that will not generate bulletins
+ * or easily tie to the Processor's log messages.
* @param args
* @return <code>true</code> if all appropriate methods were invoked and
* returned without throwing an Exception, <code>false</code> if one of the
* methods threw an Exception or could not be invoked; if <code>false</code>
* is returned, an error will have been logged.
*/
- public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> preferredAnnotation, final Class<? extends Annotation> alternateAnnotation, final Object instance, final Object... args) {
+ public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> preferredAnnotation, final Class<? extends Annotation> alternateAnnotation, final Object instance, final ProcessorLog logger, final Object... args) {
final List<Class<? extends Annotation>> annotationClasses = new ArrayList<>(alternateAnnotation == null ? 1 : 2);
annotationClasses.add(preferredAnnotation);
if ( alternateAnnotation != null ) {
@@ -194,16 +218,28 @@ public class ReflectionUtils {
try {
final Class<?>[] argumentTypes = method.getParameterTypes();
if (argumentTypes.length > args.length) {
- LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
+ if ( logger == null ) {
+ LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
new Object[]{method.getName(), instance, argumentTypes.length, args.length});
+ } else {
+ logger.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
+ new Object[]{method.getName(), instance, argumentTypes.length, args.length});
+ }
+
return false;
}
for (int i = 0; i < argumentTypes.length; i++) {
final Class<?> argType = argumentTypes[i];
if (!argType.isAssignableFrom(args[i].getClass())) {
- LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
+ if ( logger == null ) {
+ LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
+ } else {
+ logger.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
+ new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
+ }
+
return false;
}
}
@@ -219,9 +255,21 @@ public class ReflectionUtils {
method.invoke(instance, argsToPass);
}
- } catch (final IllegalAccessException | IllegalArgumentException | InvocationTargetException t) {
- LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
- LOG.error("", t);
+ } catch (final InvocationTargetException ite) {
+ if ( logger == null ) {
+ LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, ite.getCause()});
+ LOG.error("", ite.getCause());
+ } else {
+ logger.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, ite.getCause()});
+ }
+ } catch (final IllegalAccessException | IllegalArgumentException t) {
+ if ( logger == null ) {
+ LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
+ LOG.error("", t);
+ } else {
+ logger.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
+ }
+
return false;
}
} finally {
[6/6] incubator-nifi git commit: NIFI-4: Updates to provide proper
lifecycle support via annotations for controller services and reporting tasks
Posted by ma...@apache.org.
NIFI-4: Updates to provide proper lifecycle support via annotations for controller services and reporting tasks
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/850396cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/850396cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/850396cc
Branch: refs/heads/annotations
Commit: 850396cc979173e2f20ab08004f1983024d66b00
Parents: d8e1f57
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Jan 18 20:25:32 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Jan 18 20:25:32 2015 -0500
----------------------------------------------------------------------
.../org/apache/nifi/controller/ReportingTaskNode.java | 4 ++++
.../nifi/controller/service/ControllerServiceNode.java | 2 ++
.../java/org/apache/nifi/controller/FlowController.java | 12 ++++++++++++
.../controller/reporting/AbstractReportingTaskNode.java | 1 +
.../service/StandardControllerServiceProvider.java | 1 -
5 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/850396cc/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index f456ddd..0db49bd 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -68,5 +68,9 @@ public interface ReportingTaskNode extends ConfiguredComponent {
void setScheduledState(ScheduledState state);
+ void verifyCanStart();
+ void verifyCanStop();
+ void verifyCanDisable();
+ void verifyCanEnable();
void verifyCanDelete();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/850396cc/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index dd4b49a..357d4de 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -40,5 +40,7 @@ public interface ControllerServiceNode extends ConfiguredComponent {
void removeReference(ConfiguredComponent referringComponent);
+ void verifyCanEnable();
+ void verifyCanDisable();
void verifyCanDelete();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/850396cc/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
index 1d90a3a..1b7a3c0 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2522,6 +2522,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode + " because the controller is terminated");
}
+ reportingTaskNode.verifyCanStart();
+
processScheduler.schedule(reportingTaskNode);
}
@@ -2530,6 +2532,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
return;
}
+ reportingTaskNode.verifyCanStop();
+
processScheduler.unschedule(reportingTaskNode);
}
@@ -2554,18 +2558,26 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
+ reportingTaskNode.verifyCanEnable();
+
processScheduler.enableReportingTask(reportingTaskNode);
}
public void disableReportingTask(final ReportingTaskNode reportingTaskNode) {
+ reportingTaskNode.verifyCanDisable();
+
processScheduler.disableReportingTask(reportingTaskNode);
}
public void enableControllerService(final ControllerServiceNode serviceNode) {
+ serviceNode.verifyCanEnable();
+
processScheduler.enableControllerService(serviceNode);
}
public void disableControllerService(final ControllerServiceNode serviceNode) {
+ serviceNode.verifyCanDisable();
+
processScheduler.disableControllerService(serviceNode);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/850396cc/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 8b10a84..f299781 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -159,4 +159,5 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
throw new IllegalStateException(this + " is running");
}
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/850396cc/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index bf0039a..1deed59 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -34,7 +34,6 @@ import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ControllerServiceAlreadyExistsException;
import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
[3/6] incubator-nifi git commit: NIFI-4: Added OnEnabled and
OnDisabled annotations to the lifecycle package
Posted by ma...@apache.org.
NIFI-4: Added OnEnabled and OnDisabled annotations to the lifecycle package
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7bcfc93d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7bcfc93d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7bcfc93d
Branch: refs/heads/annotations
Commit: 7bcfc93d6e102691d0e7d7d6b4bc5efb223e8349
Parents: 68707ce
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 16 13:41:53 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 16 13:41:53 2015 -0500
----------------------------------------------------------------------
.../nifi/annotation/lifecycle/OnDisabled.java | 46 ++++++++++++++++++++
.../nifi/annotation/lifecycle/OnEnabled.java | 46 ++++++++++++++++++++
2 files changed, 92 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7bcfc93d/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnDisabled.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnDisabled.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnDisabled.java
new file mode 100644
index 0000000..0f78010
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnDisabled.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.annotation.lifecycle;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marker annotation a {@link org.apache.nifi.processor.Processor Processor},
+ * {@link org.apache.nifi.controller.ControllerService ControllerService} or
+ * {@link org.apache.nifi.reporting.ReportingTask ReportingTask}
+ * can use to indicate a method should be called whenever the component is disabled.
+ *
+ * <p>
+ * Methods using this annotation must take no arguments. If a method with this annotation
+ * throws a Throwable, a log message and bulletin will be issued for the component, but
+ * the component will still be disabled.
+ * </p>
+ *
+ * @author none
+ */
+@Documented
+@Target({ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface OnDisabled {
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7bcfc93d/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnEnabled.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnEnabled.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnEnabled.java
new file mode 100644
index 0000000..a0d7a14
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnEnabled.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.annotation.lifecycle;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marker annotation a {@link org.apache.nifi.processor.Processor Processor},
+ * {@link org.apache.nifi.controller.ControllerService ControllerService} or
+ * {@link org.apache.nifi.reporting.ReportingTask ReportingTask}
+ * can use to indicate a method should be called whenever the component is enabled.
+ *
+ * <p>
+ * Methods using this annotation must take no arguments. If a method with this annotation
+ * throws a Throwable, a log message and bulletin will be issued for the component, but
+ * the component will still be enabled.
+ * </p>
+ *
+ * @author none
+ */
+@Documented
+@Target({ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface OnEnabled {
+
+}
[4/6] incubator-nifi git commit: NIFI-277: Added verifyCanXX methods
Posted by ma...@apache.org.
NIFI-277: Added verifyCanXX methods
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d734220d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d734220d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d734220d
Branch: refs/heads/annotations
Commit: d734220d1e59ff02878a2b9f3913348e8d38ae17
Parents: 7bcfc93
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 16 15:51:34 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 16 15:51:34 2015 -0500
----------------------------------------------------------------------
.../nifi/controller/ReportingTaskNode.java | 16 +++++
.../service/ControllerServiceNode.java | 6 +-
.../reporting/AbstractReportingTaskNode.java | 51 ++++++++++++++++
.../service/StandardControllerServiceNode.java | 61 +++++++++++++++++---
4 files changed, 126 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index 6b8ede0..f456ddd 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -53,4 +53,20 @@ public interface ReportingTaskNode extends ConfiguredComponent {
ConfigurationContext getConfigurationContext();
boolean isRunning();
+
+ /**
+ * Indicates the {@link ScheduledState} of this <code>ReportingTask</code>. A
+ * value of stopped does NOT indicate that the <code>ReportingTask</code> has
+ * no active threads, only that it is not currently scheduled to be given
+ * any more threads. To determine whether or not the
+ * <code>ReportingTask</code> has any active threads, see
+ * {@link ProcessScheduler#getActiveThreadCount(ReportingTask)}.
+ *
+ * @return
+ */
+ ScheduledState getScheduledState();
+
+ void setScheduledState(ScheduledState state);
+
+ void verifyCanDelete();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 6f9c237..dd4b49a 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -22,7 +22,9 @@ import org.apache.nifi.controller.ControllerService;
public interface ControllerServiceNode extends ConfiguredComponent {
- ControllerService getControllerService();
+ ControllerService getProxiedControllerService();
+
+ ControllerService getControllerServiceImplementation();
Availability getAvailability();
@@ -37,4 +39,6 @@ public interface ControllerServiceNode extends ConfiguredComponent {
void addReference(ConfiguredComponent referringComponent);
void removeReference(ConfiguredComponent referringComponent);
+
+ void verifyCanDelete();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 6c27470..8b10a84 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -19,18 +19,25 @@ package org.apache.nifi.controller.reporting;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.Availability;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
+import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.ReflectionUtils;
public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode {
@@ -42,6 +49,8 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins");
private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
+ private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
+
public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory) {
@@ -108,4 +117,46 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
}
}
+ @Override
+ public ScheduledState getScheduledState() {
+ return scheduledState;
+ }
+
+ @Override
+ public void setScheduledState(final ScheduledState state) {
+ this.scheduledState = state;
+ }
+
+ @Override
+ public void setProperty(final String name, final String value) {
+ super.setProperty(name, value);
+
+ onConfigured();
+ }
+
+ @Override
+ public boolean removeProperty(String name) {
+ final boolean removed = super.removeProperty(name);
+ if ( removed ) {
+ onConfigured();
+ }
+
+ return removed;
+ }
+
+ private void onConfigured() {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup);
+ ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext);
+ } catch (final Exception e) {
+ throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + reportingTask, e);
+ }
+ }
+
+ @Override
+ public void verifyCanDelete() {
+ if (isRunning()) {
+ throw new IllegalStateException(this + " is running");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 455eac1..61a3aa8 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -26,13 +26,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.Availability;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.util.ReflectionUtils;
public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
- private final ControllerService controllerService;
+ private final ControllerService proxedControllerService;
+ private final ControllerService implementation;
+ private final ControllerServiceProvider serviceProvider;
private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
private final AtomicBoolean disabled = new AtomicBoolean(true);
@@ -43,10 +50,12 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
- public StandardControllerServiceNode(final ControllerService controllerService, final String id,
+ public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
- super(controllerService, id, validationContextFactory, serviceProvider);
- this.controllerService = controllerService;
+ super(proxiedControllerService, id, validationContextFactory, serviceProvider);
+ this.proxedControllerService = proxiedControllerService;
+ this.implementation = implementation;
+ this.serviceProvider = serviceProvider;
}
@Override
@@ -57,7 +66,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void setDisabled(final boolean disabled) {
if (!disabled && !isValid()) {
- throw new IllegalStateException("Cannot enable Controller Service " + controllerService + " because it is not valid");
+ throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid");
}
if (disabled) {
@@ -82,8 +91,13 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
}
@Override
- public ControllerService getControllerService() {
- return controllerService;
+ public ControllerService getProxiedControllerService() {
+ return proxedControllerService;
+ }
+
+ @Override
+ public ControllerService getControllerServiceImplementation() {
+ return implementation;
}
@Override
@@ -122,4 +136,37 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
}
}
+
+ @Override
+ public void setProperty(final String name, final String value) {
+ super.setProperty(name, value);
+
+ onConfigured();
+ }
+
+ @Override
+ public boolean removeProperty(String name) {
+ final boolean removed = super.removeProperty(name);
+ if ( removed ) {
+ onConfigured();
+ }
+
+ return removed;
+ }
+
+ private void onConfigured() {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider);
+ ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, implementation, configContext);
+ } catch (final Exception e) {
+ throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e);
+ }
+ }
+
+ @Override
+ public void verifyCanDelete() {
+ if ( !isDisabled() ) {
+ throw new IllegalStateException(this + " cannot be deleted because it has not been disabled");
+ }
+ }
}