You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/01/19 02:26:17 UTC

[1/6] incubator-nifi git commit: NIFI-6: Deprecated @OnConfigured annotation in favor of those in the org.apache.nifi.annotation.lifecycle package

Repository: incubator-nifi
Updated Branches:
  refs/heads/annotations [created] 850396cc9


NIFI-6: Deprecated @OnConfigured annotation in favor of those in the org.apache.nifi.annotation.lifecycle package


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/6b5d1a86
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/6b5d1a86
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/6b5d1a86

Branch: refs/heads/annotations
Commit: 6b5d1a86bec2d1d56bc44c2e400820feb4b42e2a
Parents: 0f31032
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 16 12:58:37 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 16 12:58:37 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/controller/annotation/OnConfigured.java  | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/6b5d1a86/nifi/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java
index 70f2c60..78cc04b 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/controller/annotation/OnConfigured.java
@@ -31,11 +31,14 @@ import java.lang.annotation.Target;
  * {@link nifi.controller.ConfigurationContext ConfigurationContext}.
  *
  * @author none
+ * 
+ * @deprecated This annotation has been replaced by those in the {@link org.apache.nifi.annotation.lifecycle} package.
  */
 @Documented
 @Target({ElementType.METHOD})
 @Retention(RetentionPolicy.RUNTIME)
 @Inherited
+@Deprecated
 public @interface OnConfigured {
 
 }


[2/6] incubator-nifi git commit: NIFI-4: Fixed documentation of OnScheduled and OnUnscheduled. Updated StandardProcessScheduler to invoke OnScheduled, OnUnscheduled, OnStopped methods appropriately.

Posted by ma...@apache.org.
NIFI-4: Fixed documentation of OnScheduled and OnUnscheduled. Updated StandardProcessScheduler to invoke OnScheduled, OnUnscheduled, OnStopped methods appropriately.


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/68707ce3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/68707ce3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/68707ce3

Branch: refs/heads/annotations
Commit: 68707ce3c43f96e6a26789686c8f5bc397c6a532
Parents: 6b5d1a8
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 16 13:35:17 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 16 13:35:17 2015 -0500

----------------------------------------------------------------------
 .../scheduling/StandardProcessScheduler.java    | 50 ++++++++++----------
 .../nifi/annotation/lifecycle/OnScheduled.java  |  2 +-
 .../annotation/lifecycle/OnUnscheduled.java     | 14 ++++++
 3 files changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/68707ce3/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index 5950b4e..e565ebc 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -161,18 +161,21 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         scheduleState.setScheduled(true);
 
         final Runnable startReportingTaskRunnable = new Runnable() {
+            @SuppressWarnings("deprecation")
             @Override
             public void run() {
+                // Continually attempt to start the Reporting Task, and if we fail sleep for a bit each time.
                 while (true) {
                     final ReportingTask reportingTask = taskNode.getReportingTask();
 
                     try {
                         try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                            ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, taskNode.getConfigurationContext());
+                            ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, OnScheduled.class, reportingTask, taskNode.getConfigurationContext());
                         }
+                        
                         break;
                     } catch (final InvocationTargetException ite) {
-                        LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+                        LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
                                 new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
                         LOG.error("", ite.getTargetException());
 
@@ -181,7 +184,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
                         } catch (final InterruptedException ie) {
                         }
                     } catch (final Exception e) {
-                        LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+                        LOG.error("Failed to invoke the On-Scheduled Lifecycle methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
                                 new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
                         try {
                             Thread.sleep(administrativeYieldMillis);
@@ -213,34 +216,31 @@ public final class StandardProcessScheduler implements ProcessScheduler {
             public void run() {
                 final ConfigurationContext configurationContext = taskNode.getConfigurationContext();
 
-                while (true) {
-                    try {
-                        try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                            ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
-                        }
-                        break;
-                    } catch (final InvocationTargetException ite) {
-                        LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
-                                new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
-                        LOG.error("", ite.getTargetException());
+                try {
+                    try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                        ReflectionUtils.invokeMethodsWithAnnotation(OnUnscheduled.class, org.apache.nifi.processor.annotation.OnUnscheduled.class, reportingTask, configurationContext);
+                    }
+                } catch (final InvocationTargetException ite) {
+                    LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+                            new Object[]{reportingTask, ite.getTargetException(), administrativeYieldDuration});
+                    LOG.error("", ite.getTargetException());
 
-                        try {
-                            Thread.sleep(administrativeYieldMillis);
-                        } catch (final InterruptedException ie) {
-                        }
-                    } catch (final Exception e) {
-                        LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
-                                new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
-                        try {
-                            Thread.sleep(administrativeYieldMillis);
-                        } catch (final InterruptedException ie) {
-                        }
+                    try {
+                        Thread.sleep(administrativeYieldMillis);
+                    } catch (final InterruptedException ie) {
+                    }
+                } catch (final Exception e) {
+                    LOG.error("Failed to invoke the @OnConfigured methods of {} due to {}; administratively yielding this ReportingTask and will attempt to schedule it again after {}",
+                            new Object[]{reportingTask, e.toString(), administrativeYieldDuration}, e);
+                    try {
+                        Thread.sleep(administrativeYieldMillis);
+                    } catch (final InterruptedException ie) {
                     }
                 }
 
                 agent.unschedule(taskNode, scheduleState);
 
-                if (scheduleState.getActiveThreadCount() == 0) {
+                if (scheduleState.getActiveThreadCount() == 0 && scheduleState.mustCallOnStoppedMethods()) {
                     ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, reportingTask, configurationContext);
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/68707ce3/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
index 9dfd150..a0703fa 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnScheduled.java
@@ -41,7 +41,7 @@ import java.lang.annotation.Target;
  * 
  * <p>
  * If using 1 argument and the component using the annotation is a Reporting Task, that argument must
- * be of type {@link org.apache.nifi.reporting.ReportingContext ReportingContext}.
+ * be of type {@link org.apache.nifi.controller.ConfigurationContext ConfigurationContext}.
  * </p>
  *
  * If any method annotated with this annotation throws any Throwable, the framework will wait a while

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/68707ce3/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
index 68d0fe8..b1dbde1 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnUnscheduled.java
@@ -33,6 +33,20 @@ import java.lang.annotation.Target;
  * threads are potentially running. To invoke a method after all threads have
  * finished processing, see the {@link OnStopped} annotation.
  * </p>
+ * 
+ * <p>
+ * Methods using this annotation must take either 0 arguments or a single argument.
+ * </p>
+ * 
+ * <p>
+ * If using 1 argument and the component using the annotation is a Processor, that argument must
+ * be of type {@link org.apache.nifi.processor.ProcessContext ProcessContext}.
+ * </p>
+ * 
+ * <p>
+ * If using 1 argument and the component using the annotation is a Reporting Task, that argument must
+ * be of type {@link org.apache.nifi.controller.ConfigurationContext ConfigurationContext}.
+ * </p>
  *
  * @author none
  */


[5/6] incubator-nifi git commit: NIFI-4: Added lifecycle annotation support

Posted by ma...@apache.org.
NIFI-4: Added lifecycle annotation support


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d8e1f570
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d8e1f570
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d8e1f570

Branch: refs/heads/annotations
Commit: d8e1f570a68df152f1d29d60acf732a0f6b532ec
Parents: d734220
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 16 15:52:47 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 16 15:52:47 2015 -0500

----------------------------------------------------------------------
 .../service/ControllerServiceProvider.java      | 21 +++--
 .../apache/nifi/controller/FlowController.java  | 57 ++++++++++++-
 .../scheduling/StandardProcessScheduler.java    | 88 ++++++++++++++++++--
 .../StandardControllerServiceProvider.java      | 43 +++++++---
 .../processor/StandardSchedulingContext.java    |  4 +-
 .../org/apache/nifi/util/ReflectionUtils.java   | 62 ++++++++++++--
 6 files changed, 238 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 35a255d..03ed779 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -16,8 +16,7 @@
  */
 package org.apache.nifi.controller.service;
 
-import java.util.Map;
-
+import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.controller.ControllerServiceLookup;
 
 /**
@@ -26,15 +25,15 @@ import org.apache.nifi.controller.ControllerServiceLookup;
 public interface ControllerServiceProvider extends ControllerServiceLookup {
 
     /**
-     * Gets the controller service for the specified identifier. Returns null if
-     * the identifier does not match a known service.
+     * Creates a new Controller Service of the given type and assigns it the given id. If <code>firstTimeadded</code>
+     * is true, calls any methods that are annotated with {@link OnAdded}
      *
      * @param type
      * @param id
-     * @param properties
+     * @param firstTimeAdded
      * @return
      */
-    ControllerServiceNode createControllerService(String type, String id, Map<String, String> properties);
+    ControllerServiceNode createControllerService(String type, String id, boolean firstTimeAdded);
 
     /**
      * Gets the controller service node for the specified identifier. Returns
@@ -44,4 +43,14 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
      * @return
      */
     ControllerServiceNode getControllerServiceNode(String id);
+    
+    /**
+     * Removes the given Controller Service from the flow. This will call all appropriate methods
+     * that have the @OnRemoved annotation.
+     * 
+     * @param serviceNode the controller service to remove
+     * 
+     * @throws IllegalStateException if the controller service is not disabled or is not a part of this flow
+     */
+    void removeControllerService(ControllerServiceNode serviceNode);
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
index 860ea2d..1d90a3a 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -50,6 +50,7 @@ import javax.net.ssl.SSLContext;
 
 import org.apache.nifi.admin.service.UserService;
 import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.cluster.BulletinsPayload;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.DataFlow;
@@ -134,6 +135,7 @@ import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.logging.ProcessorLogObserver;
 import org.apache.nifi.nar.ExtensionManager;
+import org.apache.nifi.nar.NarClassLoader;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
 import org.apache.nifi.processor.Processor;
@@ -2463,6 +2465,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
     }
 
     public ReportingTaskNode createReportingTask(final String type, String id) throws ReportingTaskInstantiationException {
+        return createReportingTask(type, id, true);
+    }
+    
+    public ReportingTaskNode createReportingTask(final String type, String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
         if (type == null) {
             throw new NullPointerException();
         }
@@ -2484,7 +2490,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
             final Class<? extends ReportingTask> reportingTaskClass = rawClass.asSubclass(ReportingTask.class);
             final Object reportingTaskObj = reportingTaskClass.newInstance();
             task = reportingTaskClass.cast(reportingTaskObj);
-
         } catch (final ClassNotFoundException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException t) {
             throw new ReportingTaskInstantiationException(type, t);
         } finally {
@@ -2495,6 +2500,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
 
         final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider);
         final ReportingTaskNode taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory);
+        
+        if ( firstTimeAdded ) {
+            try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
+            } catch (final Exception e) {
+                throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + task, e);
+            }
+        }
+        
         reportingTasks.put(id, taskNode);
         return taskNode;
     }
@@ -2519,13 +2533,45 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
         processScheduler.unschedule(reportingTaskNode);
     }
 
+    public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
+        final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
+        if ( existing == null || existing != reportingTaskNode ) {
+            throw new IllegalStateException("Reporting Task " + reportingTaskNode + " does not exist in this Flow");
+        }
+        
+        reportingTaskNode.verifyCanDelete();
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
+        }
+        
+        reportingTasks.remove(reportingTaskNode.getIdentifier());
+    }
+    
     Collection<ReportingTaskNode> getReportingTasks() {
         return reportingTasks.values();
     }
 
+
+    public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
+        processScheduler.enableReportingTask(reportingTaskNode);
+    }
+    
+    public void disableReportingTask(final ReportingTaskNode reportingTaskNode) {
+        processScheduler.disableReportingTask(reportingTaskNode);
+    }
+    
+    public void enableControllerService(final ControllerServiceNode serviceNode) {
+        processScheduler.enableControllerService(serviceNode);
+    }
+    
+    public void disableControllerService(final ControllerServiceNode serviceNode) {
+        processScheduler.disableControllerService(serviceNode);
+    }
+
     @Override
-    public ControllerServiceNode createControllerService(final String type, final String id, final Map<String, String> properties) {
-        return controllerServiceProvider.createControllerService(type, id.intern(), properties);
+    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
+        return controllerServiceProvider.createControllerService(type, id.intern(), firstTimeAdded);
     }
 
     @Override
@@ -2548,6 +2594,11 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
         return controllerServiceProvider.isControllerServiceEnabled(serviceIdentifier);
     }
 
+    @Override
+    public void removeControllerService(final ControllerServiceNode serviceNode) {
+        controllerServiceProvider.removeControllerService(serviceNode);
+    }
+    
     //
     // Counters
     //

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index e565ebc..0653b03 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -27,6 +27,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.nifi.annotation.lifecycle.OnDisabled;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
@@ -41,6 +43,7 @@ import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
@@ -514,14 +517,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
     }
 
     @Override
-    public synchronized void disableProcessor(final ProcessorNode procNode) {
-        if (procNode.getScheduledState() != ScheduledState.STOPPED) {
-            throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
-        }
-        procNode.setScheduledState(ScheduledState.DISABLED);
-    }
-
-    @Override
     public synchronized void enablePort(final Port port) {
         if (port.getScheduledState() != ScheduledState.DISABLED) {
             throw new IllegalStateException("Funnel cannot be enabled because it is not disabled");
@@ -539,9 +534,84 @@ public final class StandardProcessScheduler implements ProcessScheduler {
         if (procNode.getScheduledState() != ScheduledState.DISABLED) {
             throw new IllegalStateException("Processor cannot be enabled because it is not disabled");
         }
+        
         procNode.setScheduledState(ScheduledState.STOPPED);
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, procNode.getProcessor(), processorLog);
+        }
+    }
+
+    @Override
+    public synchronized void disableProcessor(final ProcessorNode procNode) {
+        if (procNode.getScheduledState() != ScheduledState.STOPPED) {
+            throw new IllegalStateException("Processor cannot be disabled because its state is set to " + procNode.getScheduledState());
+        }
+        
+        procNode.setScheduledState(ScheduledState.DISABLED);
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            final ProcessorLog processorLog = new SimpleProcessLogger(procNode.getIdentifier(), procNode.getProcessor());
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, procNode.getProcessor(), processorLog);
+        }
     }
 
+    public synchronized void enableReportingTask(final ReportingTaskNode taskNode) {
+        if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
+            throw new IllegalStateException("Reporting Task cannot be enabled because it is not disabled");
+        }
+
+        taskNode.setScheduledState(ScheduledState.STOPPED);
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, taskNode.getReportingTask());
+        }
+    }
+    
+    public synchronized void disableReportingTask(final ReportingTaskNode taskNode) {
+        if ( taskNode.getScheduledState() != ScheduledState.STOPPED ) {
+            throw new IllegalStateException("Reporting Task cannot be disabled because its state is set to " + taskNode.getScheduledState() + " but transition to DISABLED state is allowed only from the STOPPED state");
+        }
+
+        taskNode.setScheduledState(ScheduledState.DISABLED);
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, taskNode.getReportingTask());
+        }
+    }
+
+    public synchronized void enableControllerService(final ControllerServiceNode serviceNode) {
+        if ( !serviceNode.isDisabled() ) {
+            throw new IllegalStateException("Controller Service cannot be enabled because it is not disabled");
+        }
+
+        // we set the service to enabled before invoking the @OnEnabled methods. We do this because it must be
+        // done in this order for disabling (serviceNode.setDisabled(true) will throw Exceptions if the service
+        // is currently known to be in use) and we want to be consistent with the ordering of calling setDisabled
+        // before annotated methods.
+        serviceNode.setDisabled(false);
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnEnabled.class, serviceNode.getControllerServiceImplementation());
+        }
+    }
+    
+    public synchronized void disableControllerService(final ControllerServiceNode serviceNode) {
+        if ( serviceNode.isDisabled() ) {
+            throw new IllegalStateException("Controller Service cannot be disabled because it is already disabled");
+        }
+
+        // We must set the service to disabled before we invoke the OnDisabled methods because the service node
+        // can throw Exceptions if we attempt to disable the service while it's known to be in use.
+        serviceNode.setDisabled(true);
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnDisabled.class, serviceNode.getControllerServiceImplementation());
+        }
+    }
+    
+    
     @Override
     public boolean isScheduled(final Object scheduled) {
         final ScheduleState scheduleState = scheduleStates.get(scheduled);
@@ -549,7 +619,7 @@ public final class StandardProcessScheduler implements ProcessScheduler {
     }
 
     /**
-     * Returns the ScheduleState that is registered for the given ProcessorNode;
+     * Returns the ScheduleState that is registered for the given component;
      * if no ScheduleState current is registered, one is created and registered
      * atomically, and then that value is returned.
      *

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index fc07ce1..bf0039a 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -30,17 +30,20 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.nifi.annotation.lifecycle.OnAdded;
+import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ValidationContextFactory;
-import org.apache.nifi.controller.annotation.OnConfigured;
 import org.apache.nifi.controller.exception.ControllerServiceAlreadyExistsException;
 import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;
+import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.processor.StandardValidationContextFactory;
 import org.apache.nifi.util.ObjectHolder;
 import org.apache.nifi.util.ReflectionUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -93,7 +96,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     }
 
     @Override
-    public ControllerServiceNode createControllerService(final String type, final String id, final Map<String, String> properties) {
+    public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
         if (type == null || id == null) {
             throw new NullPointerException();
         }
@@ -139,15 +142,18 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
             final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(this);
 
-            final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, id, validationContextFactory, this);
+            final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this);
             serviceNodeHolder.set(serviceNode);
             serviceNode.setAnnotationData(null);
             serviceNode.setName(id);
-            for (final Map.Entry<String, String> entry : properties.entrySet()) {
-                serviceNode.setProperty(entry.getKey(), entry.getValue());
+            
+            if ( firstTimeAdded ) {
+                try (final NarCloseable x = NarCloseable.withNarLoader()) {
+                    ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService);
+                } catch (final Exception e) {
+                    throw new ProcessorLifeCycleException("Failed to invoke On-Added Lifecycle methods of " + originalService, e);
+                }
             }
-            final StandardConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
-            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnConfigured.class, originalService, configurationContext);
 
             this.controllerServices.put(id, serviceNode);
             return serviceNode;
@@ -163,7 +169,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     @Override
     public ControllerService getControllerService(final String serviceIdentifier) {
         final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
-        return (node == null) ? null : node.getControllerService();
+        return (node == null) ? null : node.getProxiedControllerService();
     }
 
     @Override
@@ -186,11 +192,28 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
     public Set<String> getControllerServiceIdentifiers(final Class<? extends ControllerService> serviceType) {
         final Set<String> identifiers = new HashSet<>();
         for (final Map.Entry<String, ControllerServiceNode> entry : controllerServices.entrySet()) {
-            if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getControllerService().getClass())) {
+            if (requireNonNull(serviceType).isAssignableFrom(entry.getValue().getProxiedControllerService().getClass())) {
                 identifiers.add(entry.getKey());
             }
         }
 
         return identifiers;
     }
+    
+    @Override
+    public void removeControllerService(final ControllerServiceNode serviceNode) {
+        final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier());
+        if ( existing == null || existing != serviceNode ) {
+            throw new IllegalStateException("Controller Service " + serviceNode + " does not exist in this Flow");
+        }
+        
+        serviceNode.verifyCanDelete();
+        
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
+            ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext);
+        }
+        
+        controllerServices.remove(serviceNode.getIdentifier());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
index 318901f..ac58504 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/processor/StandardSchedulingContext.java
@@ -46,11 +46,11 @@ public class StandardSchedulingContext implements SchedulingContext {
         }
 
         if (serviceNode.isDisabled()) {
-            throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is currently disabled");
+            throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is currently disabled");
         }
 
         if (!serviceNode.isValid()) {
-            throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getControllerService() + " is not currently valid");
+            throw new IllegalStateException("Cannot lease Controller Service because Controller Service " + serviceNode.getProxiedControllerService() + " is not currently valid");
         }
 
         serviceNode.addReference(processorNode);

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d8e1f570/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
index f8e7da4..a8a4596 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/util/ReflectionUtils.java
@@ -22,6 +22,7 @@ import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.nifi.logging.ProcessorLog;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -148,7 +149,28 @@ public class ReflectionUtils {
      * is returned, an error will have been logged.
      */
     public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final Object... args) {
-        return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, args);
+        return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, null, args);
+    }
+    
+    
+    /**
+     * Invokes all methods on the given instance that have been annotated with
+     * the given Annotation. If the signature of the method that is defined in
+     * <code>instance</code> uses 1 or more parameters, those parameters must be
+     * specified by the <code>args</code> parameter. However, if more arguments
+     * are supplied by the <code>args</code> parameter than needed, the extra
+     * arguments will be ignored.
+     *
+     * @param annotation
+     * @param instance
+     * @param args
+     * @return <code>true</code> if all appropriate methods were invoked and
+     * returned without throwing an Exception, <code>false</code> if one of the
+     * methods threw an Exception or could not be invoked; if <code>false</code>
+     * is returned, an error will have been logged.
+     */
+    public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> annotation, final Object instance, final ProcessorLog logger, final Object... args) {
+        return quietlyInvokeMethodsWithAnnotation(annotation, null, instance, logger, args);
     }
     
     
@@ -165,13 +187,15 @@ public class ReflectionUtils {
      * @param preferredAnnotation
      * @param alternateAnnotation
      * @param instance
+     * @param logger the ProcessorLog to use for logging any errors. If null, will use own logger, but that will not generate bulletins
+     *          or easily tie to the Processor's log messages.
      * @param args
      * @return <code>true</code> if all appropriate methods were invoked and
      * returned without throwing an Exception, <code>false</code> if one of the
      * methods threw an Exception or could not be invoked; if <code>false</code>
      * is returned, an error will have been logged.
      */
-    public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> preferredAnnotation, final Class<? extends Annotation> alternateAnnotation, final Object instance, final Object... args) {
+    public static boolean quietlyInvokeMethodsWithAnnotation(final Class<? extends Annotation> preferredAnnotation, final Class<? extends Annotation> alternateAnnotation, final Object instance, final ProcessorLog logger, final Object... args) {
         final List<Class<? extends Annotation>> annotationClasses = new ArrayList<>(alternateAnnotation == null ? 1 : 2);
         annotationClasses.add(preferredAnnotation);
         if ( alternateAnnotation != null ) {
@@ -194,16 +218,28 @@ public class ReflectionUtils {
                     try {
                         final Class<?>[] argumentTypes = method.getParameterTypes();
                         if (argumentTypes.length > args.length) {
-                            LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
+                            if ( logger == null ) {
+                                LOG.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
                                     new Object[]{method.getName(), instance, argumentTypes.length, args.length});
+                            } else {
+                                logger.error("Unable to invoke method {} on {} because method expects {} parameters but only {} were given",
+                                        new Object[]{method.getName(), instance, argumentTypes.length, args.length});
+                            }
+                            
                             return false;
                         }
     
                         for (int i = 0; i < argumentTypes.length; i++) {
                             final Class<?> argType = argumentTypes[i];
                             if (!argType.isAssignableFrom(args[i].getClass())) {
-                                LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
+                                if ( logger == null ) {
+                                    LOG.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
                                         new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
+                                } else {
+                                    logger.error("Unable to invoke method {} on {} because method parameter {} is expected to be of type {} but argument passed was of type {}",
+                                            new Object[]{method.getName(), instance, i, argType, args[i].getClass()});
+                                }
+                                
                                 return false;
                             }
                         }
@@ -219,9 +255,21 @@ public class ReflectionUtils {
     
                                 method.invoke(instance, argsToPass);
                             }
-                        } catch (final IllegalAccessException | IllegalArgumentException | InvocationTargetException t) {
-                            LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
-                            LOG.error("", t);
+                        } catch (final InvocationTargetException ite) {
+                            if ( logger == null ) {
+                                LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, ite.getCause()});
+                                LOG.error("", ite.getCause());
+                            } else {
+                                logger.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, ite.getCause()});
+                            }
+                        } catch (final IllegalAccessException | IllegalArgumentException t) {
+                            if ( logger == null ) {
+                                LOG.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
+                                LOG.error("", t);
+                            } else {
+                                logger.error("Unable to invoke method {} on {} due to {}", new Object[]{method.getName(), instance, t});
+                            }
+                            
                             return false;
                         }
                     } finally {


[6/6] incubator-nifi git commit: NIFI-4: Updates to provide proper lifecycle support via annotations for controller services and reporting tasks

Posted by ma...@apache.org.
NIFI-4: Updates to provide proper lifecycle support via annotations for controller services and reporting tasks


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/850396cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/850396cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/850396cc

Branch: refs/heads/annotations
Commit: 850396cc979173e2f20ab08004f1983024d66b00
Parents: d8e1f57
Author: Mark Payne <ma...@hotmail.com>
Authored: Sun Jan 18 20:25:32 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Sun Jan 18 20:25:32 2015 -0500

----------------------------------------------------------------------
 .../org/apache/nifi/controller/ReportingTaskNode.java   |  4 ++++
 .../nifi/controller/service/ControllerServiceNode.java  |  2 ++
 .../java/org/apache/nifi/controller/FlowController.java | 12 ++++++++++++
 .../controller/reporting/AbstractReportingTaskNode.java |  1 +
 .../service/StandardControllerServiceProvider.java      |  1 -
 5 files changed, 19 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/850396cc/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index f456ddd..0db49bd 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -68,5 +68,9 @@ public interface ReportingTaskNode extends ConfiguredComponent {
     
     void setScheduledState(ScheduledState state);
     
+    void verifyCanStart();
+    void verifyCanStop();
+    void verifyCanDisable();
+    void verifyCanEnable();
     void verifyCanDelete();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/850396cc/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index dd4b49a..357d4de 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -40,5 +40,7 @@ public interface ControllerServiceNode extends ConfiguredComponent {
 
     void removeReference(ConfiguredComponent referringComponent);
     
+    void verifyCanEnable();
+    void verifyCanDisable();
     void verifyCanDelete();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/850396cc/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
index 1d90a3a..1b7a3c0 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -2522,6 +2522,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
             throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode + " because the controller is terminated");
         }
 
+        reportingTaskNode.verifyCanStart();
+        
         processScheduler.schedule(reportingTaskNode);
     }
 
@@ -2530,6 +2532,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
             return;
         }
 
+        reportingTaskNode.verifyCanStop();
+        
         processScheduler.unschedule(reportingTaskNode);
     }
 
@@ -2554,18 +2558,26 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
 
 
     public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
+        reportingTaskNode.verifyCanEnable();
+        
         processScheduler.enableReportingTask(reportingTaskNode);
     }
     
     public void disableReportingTask(final ReportingTaskNode reportingTaskNode) {
+        reportingTaskNode.verifyCanDisable();
+        
         processScheduler.disableReportingTask(reportingTaskNode);
     }
     
     public void enableControllerService(final ControllerServiceNode serviceNode) {
+        serviceNode.verifyCanEnable();
+        
         processScheduler.enableControllerService(serviceNode);
     }
     
     public void disableControllerService(final ControllerServiceNode serviceNode) {
+        serviceNode.verifyCanDisable();
+        
         processScheduler.disableControllerService(serviceNode);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/850396cc/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 8b10a84..f299781 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -159,4 +159,5 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
             throw new IllegalStateException(this + " is running");
         }
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/850396cc/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index bf0039a..1deed59 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -34,7 +34,6 @@ import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.exception.ControllerServiceAlreadyExistsException;
 import org.apache.nifi.controller.exception.ControllerServiceNotFoundException;


[3/6] incubator-nifi git commit: NIFI-4: Added OnEnabled and OnDisabled annotations to the lifecycle package

Posted by ma...@apache.org.
NIFI-4: Added OnEnabled and OnDisabled annotations to the lifecycle package


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7bcfc93d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7bcfc93d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7bcfc93d

Branch: refs/heads/annotations
Commit: 7bcfc93d6e102691d0e7d7d6b4bc5efb223e8349
Parents: 68707ce
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 16 13:41:53 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 16 13:41:53 2015 -0500

----------------------------------------------------------------------
 .../nifi/annotation/lifecycle/OnDisabled.java   | 46 ++++++++++++++++++++
 .../nifi/annotation/lifecycle/OnEnabled.java    | 46 ++++++++++++++++++++
 2 files changed, 92 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7bcfc93d/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnDisabled.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnDisabled.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnDisabled.java
new file mode 100644
index 0000000..0f78010
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnDisabled.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.annotation.lifecycle;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marker annotation a {@link org.apache.nifi.processor.Processor Processor},
+ * {@link org.apache.nifi.controller.ControllerService ControllerService} or 
+ * {@link org.apache.nifi.reporting.ReportingTask ReportingTask}  
+ * can use to indicate a method should be called whenever the component is disabled. 
+ *
+ * <p>
+ * Methods using this annotation must take no arguments. If a method with this annotation
+ * throws a Throwable, a log message and bulletin will be issued for the component, but
+ * the component will still be disabled.
+ * </p>
+ *
+ * @author none
+ */
+@Documented
+@Target({ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface OnDisabled {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7bcfc93d/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnEnabled.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnEnabled.java b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnEnabled.java
new file mode 100644
index 0000000..a0d7a14
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/annotation/lifecycle/OnEnabled.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.annotation.lifecycle;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marker annotation a {@link org.apache.nifi.processor.Processor Processor},
+ * {@link org.apache.nifi.controller.ControllerService ControllerService} or 
+ * {@link org.apache.nifi.reporting.ReportingTask ReportingTask}  
+ * can use to indicate a method should be called whenever the component is enabled. 
+ *
+ * <p>
+ * Methods using this annotation must take no arguments. If a method with this annotation
+ * throws a Throwable, a log message and bulletin will be issued for the component, but
+ * the component will still be enabled.
+ * </p>
+ *
+ * @author none
+ */
+@Documented
+@Target({ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+@Inherited
+public @interface OnEnabled {
+
+}


[4/6] incubator-nifi git commit: NIFI-277: Added verifyCanXX methods

Posted by ma...@apache.org.
NIFI-277: Added verifyCanXX methods


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d734220d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d734220d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d734220d

Branch: refs/heads/annotations
Commit: d734220d1e59ff02878a2b9f3913348e8d38ae17
Parents: 7bcfc93
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 16 15:51:34 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 16 15:51:34 2015 -0500

----------------------------------------------------------------------
 .../nifi/controller/ReportingTaskNode.java      | 16 +++++
 .../service/ControllerServiceNode.java          |  6 +-
 .../reporting/AbstractReportingTaskNode.java    | 51 ++++++++++++++++
 .../service/StandardControllerServiceNode.java  | 61 +++++++++++++++++---
 4 files changed, 126 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index 6b8ede0..f456ddd 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -53,4 +53,20 @@ public interface ReportingTaskNode extends ConfiguredComponent {
     ConfigurationContext getConfigurationContext();
 
     boolean isRunning();
+    
+    /**
+     * Indicates the {@link ScheduledState} of this <code>ReportingTask</code>. A
+     * value of stopped does NOT indicate that the <code>ReportingTask</code> has
+     * no active threads, only that it is not currently scheduled to be given
+     * any more threads. To determine whether or not the
+     * <code>ReportingTask</code> has any active threads, see
+     * {@link ProcessScheduler#getActiveThreadCount(ReportingTask)}.
+     *
+     * @return
+     */
+    ScheduledState getScheduledState();
+    
+    void setScheduledState(ScheduledState state);
+    
+    void verifyCanDelete();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 6f9c237..dd4b49a 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -22,7 +22,9 @@ import org.apache.nifi.controller.ControllerService;
 
 public interface ControllerServiceNode extends ConfiguredComponent {
 
-    ControllerService getControllerService();
+    ControllerService getProxiedControllerService();
+    
+    ControllerService getControllerServiceImplementation();
 
     Availability getAvailability();
 
@@ -37,4 +39,6 @@ public interface ControllerServiceNode extends ConfiguredComponent {
     void addReference(ConfiguredComponent referringComponent);
 
     void removeReference(ConfiguredComponent referringComponent);
+    
+    void verifyCanDelete();
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 6c27470..8b10a84 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -19,18 +19,25 @@ package org.apache.nifi.controller.reporting;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.nifi.annotation.lifecycle.OnAdded;
 import org.apache.nifi.controller.AbstractConfiguredComponent;
 import org.apache.nifi.controller.Availability;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.StandardProcessorNode;
 import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.service.StandardConfigurationContext;
+import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.ReflectionUtils;
 
 public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode {
 
@@ -42,6 +49,8 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
     private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins");
     private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
 
+    private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
+    
     public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
             final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
             final ValidationContextFactory validationContextFactory) {
@@ -108,4 +117,46 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
         }
     }
 
+    @Override
+    public ScheduledState getScheduledState() {
+        return scheduledState;
+    }
+    
+    @Override
+    public void setScheduledState(final ScheduledState state) {
+        this.scheduledState = state;
+    }
+    
+    @Override
+    public void setProperty(final String name, final String value) {
+        super.setProperty(name, value);
+        
+        onConfigured();
+    }
+    
+    @Override
+    public boolean removeProperty(String name) {
+        final boolean removed = super.removeProperty(name);
+        if ( removed ) {
+            onConfigured();
+        }
+        
+        return removed;
+    }
+    
+    private void onConfigured() {
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup);
+            ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext);
+        } catch (final Exception e) {
+            throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + reportingTask, e);
+        }
+    }
+    
+    @Override
+    public void verifyCanDelete() {
+        if (isRunning()) {
+            throw new IllegalStateException(this + " is running");
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 455eac1..61a3aa8 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -26,13 +26,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.nifi.controller.AbstractConfiguredComponent;
 import org.apache.nifi.controller.Availability;
+import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.util.ReflectionUtils;
 
 public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
 
-    private final ControllerService controllerService;
+    private final ControllerService proxedControllerService;
+    private final ControllerService implementation;
+    private final ControllerServiceProvider serviceProvider;
 
     private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
     private final AtomicBoolean disabled = new AtomicBoolean(true);
@@ -43,10 +50,12 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
 
     private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
 
-    public StandardControllerServiceNode(final ControllerService controllerService, final String id,
+    public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
             final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
-        super(controllerService, id, validationContextFactory, serviceProvider);
-        this.controllerService = controllerService;
+        super(proxiedControllerService, id, validationContextFactory, serviceProvider);
+        this.proxedControllerService = proxiedControllerService;
+        this.implementation = implementation;
+        this.serviceProvider = serviceProvider;
     }
 
     @Override
@@ -57,7 +66,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     @Override
     public void setDisabled(final boolean disabled) {
         if (!disabled && !isValid()) {
-            throw new IllegalStateException("Cannot enable Controller Service " + controllerService + " because it is not valid");
+            throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid");
         }
 
         if (disabled) {
@@ -82,8 +91,13 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     }
 
     @Override
-    public ControllerService getControllerService() {
-        return controllerService;
+    public ControllerService getProxiedControllerService() {
+        return proxedControllerService;
+    }
+    
+    @Override
+    public ControllerService getControllerServiceImplementation() {
+        return implementation;
     }
 
     @Override
@@ -122,4 +136,37 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
             throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
         }
     }
+    
+    @Override
+    public void setProperty(final String name, final String value) {
+        super.setProperty(name, value);
+        
+        onConfigured();
+    }
+    
+    @Override
+    public boolean removeProperty(String name) {
+        final boolean removed = super.removeProperty(name);
+        if ( removed ) {
+            onConfigured();
+        }
+        
+        return removed;
+    }
+    
+    private void onConfigured() {
+        try (final NarCloseable x = NarCloseable.withNarLoader()) {
+            final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider);
+            ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, implementation, configContext);
+        } catch (final Exception e) {
+            throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e);
+        }
+    }
+    
+    @Override
+    public void verifyCanDelete() {
+        if ( !isDisabled() ) {
+            throw new IllegalStateException(this + " cannot be deleted because it has not been disabled");
+        }
+    }
 }