You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/01/26 15:19:33 UTC
[09/48] incubator-nifi git commit: NIFI-277: Added verifyCanXX methods
NIFI-277: Added verifyCanXX methods
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/d734220d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/d734220d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/d734220d
Branch: refs/heads/NIFI-250
Commit: d734220d1e59ff02878a2b9f3913348e8d38ae17
Parents: 7bcfc93
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Jan 16 15:51:34 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jan 16 15:51:34 2015 -0500
----------------------------------------------------------------------
.../nifi/controller/ReportingTaskNode.java | 16 +++++
.../service/ControllerServiceNode.java | 6 +-
.../reporting/AbstractReportingTaskNode.java | 51 ++++++++++++++++
.../service/StandardControllerServiceNode.java | 61 +++++++++++++++++---
4 files changed, 126 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
index 6b8ede0..f456ddd 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/ReportingTaskNode.java
@@ -53,4 +53,20 @@ public interface ReportingTaskNode extends ConfiguredComponent {
ConfigurationContext getConfigurationContext();
boolean isRunning();
+
+ /**
+ * Indicates the {@link ScheduledState} of this <code>ReportingTask</code>. A
+ * value of stopped does NOT indicate that the <code>ReportingTask</code> has
+ * no active threads, only that it is not currently scheduled to be given
+ * any more threads. To determine whether or not the
+ * <code>ReportingTask</code> has any active threads, see
+ * {@link ProcessScheduler#getActiveThreadCount(ReportingTask)}.
+ *
+ * @return
+ */
+ ScheduledState getScheduledState();
+
+ void setScheduledState(ScheduledState state);
+
+ void verifyCanDelete();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 6f9c237..dd4b49a 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -22,7 +22,9 @@ import org.apache.nifi.controller.ControllerService;
public interface ControllerServiceNode extends ConfiguredComponent {
- ControllerService getControllerService();
+ ControllerService getProxiedControllerService();
+
+ ControllerService getControllerServiceImplementation();
Availability getAvailability();
@@ -37,4 +39,6 @@ public interface ControllerServiceNode extends ConfiguredComponent {
void addReference(ConfiguredComponent referringComponent);
void removeReference(ConfiguredComponent referringComponent);
+
+ void verifyCanDelete();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
index 6c27470..8b10a84 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/reporting/AbstractReportingTaskNode.java
@@ -19,18 +19,25 @@ package org.apache.nifi.controller.reporting;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.Availability;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.controller.ScheduledState;
+import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.service.StandardConfigurationContext;
+import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.FormatUtils;
+import org.apache.nifi.util.ReflectionUtils;
public abstract class AbstractReportingTaskNode extends AbstractConfiguredComponent implements ReportingTaskNode {
@@ -42,6 +49,8 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
private final AtomicReference<String> schedulingPeriod = new AtomicReference<>("5 mins");
private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
+ private volatile ScheduledState scheduledState = ScheduledState.STOPPED;
+
public AbstractReportingTaskNode(final ReportingTask reportingTask, final String id,
final ControllerServiceProvider controllerServiceProvider, final ProcessScheduler processScheduler,
final ValidationContextFactory validationContextFactory) {
@@ -108,4 +117,46 @@ public abstract class AbstractReportingTaskNode extends AbstractConfiguredCompon
}
}
+ @Override
+ public ScheduledState getScheduledState() {
+ return scheduledState;
+ }
+
+ @Override
+ public void setScheduledState(final ScheduledState state) {
+ this.scheduledState = state;
+ }
+
+ @Override
+ public void setProperty(final String name, final String value) {
+ super.setProperty(name, value);
+
+ onConfigured();
+ }
+
+ @Override
+ public boolean removeProperty(String name) {
+ final boolean removed = super.removeProperty(name);
+ if ( removed ) {
+ onConfigured();
+ }
+
+ return removed;
+ }
+
+ private void onConfigured() {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceLookup);
+ ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, reportingTask, configContext);
+ } catch (final Exception e) {
+ throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + reportingTask, e);
+ }
+ }
+
+ @Override
+ public void verifyCanDelete() {
+ if (isRunning()) {
+ throw new IllegalStateException(this + " is running");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/d734220d/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 455eac1..61a3aa8 100644
--- a/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -26,13 +26,20 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.Availability;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ValidationContextFactory;
+import org.apache.nifi.controller.annotation.OnConfigured;
+import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
+import org.apache.nifi.nar.NarCloseable;
+import org.apache.nifi.util.ReflectionUtils;
public class StandardControllerServiceNode extends AbstractConfiguredComponent implements ControllerServiceNode {
- private final ControllerService controllerService;
+ private final ControllerService proxedControllerService;
+ private final ControllerService implementation;
+ private final ControllerServiceProvider serviceProvider;
private final AtomicReference<Availability> availability = new AtomicReference<>(Availability.NODE_ONLY);
private final AtomicBoolean disabled = new AtomicBoolean(true);
@@ -43,10 +50,12 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
private final Set<ConfiguredComponent> referencingComponents = new HashSet<>();
- public StandardControllerServiceNode(final ControllerService controllerService, final String id,
+ public StandardControllerServiceNode(final ControllerService proxiedControllerService, final ControllerService implementation, final String id,
final ValidationContextFactory validationContextFactory, final ControllerServiceProvider serviceProvider) {
- super(controllerService, id, validationContextFactory, serviceProvider);
- this.controllerService = controllerService;
+ super(proxiedControllerService, id, validationContextFactory, serviceProvider);
+ this.proxedControllerService = proxiedControllerService;
+ this.implementation = implementation;
+ this.serviceProvider = serviceProvider;
}
@Override
@@ -57,7 +66,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void setDisabled(final boolean disabled) {
if (!disabled && !isValid()) {
- throw new IllegalStateException("Cannot enable Controller Service " + controllerService + " because it is not valid");
+ throw new IllegalStateException("Cannot enable Controller Service " + implementation + " because it is not valid");
}
if (disabled) {
@@ -82,8 +91,13 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
}
@Override
- public ControllerService getControllerService() {
- return controllerService;
+ public ControllerService getProxiedControllerService() {
+ return proxedControllerService;
+ }
+
+ @Override
+ public ControllerService getControllerServiceImplementation() {
+ return implementation;
}
@Override
@@ -122,4 +136,37 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
throw new IllegalStateException("Cannot modify Controller Service configuration because it is currently enabled. Please disable the Controller Service first.");
}
}
+
+ @Override
+ public void setProperty(final String name, final String value) {
+ super.setProperty(name, value);
+
+ onConfigured();
+ }
+
+ @Override
+ public boolean removeProperty(String name) {
+ final boolean removed = super.removeProperty(name);
+ if ( removed ) {
+ onConfigured();
+ }
+
+ return removed;
+ }
+
+ private void onConfigured() {
+ try (final NarCloseable x = NarCloseable.withNarLoader()) {
+ final ConfigurationContext configContext = new StandardConfigurationContext(this, serviceProvider);
+ ReflectionUtils.invokeMethodsWithAnnotation(OnConfigured.class, implementation, configContext);
+ } catch (final Exception e) {
+ throw new ProcessorLifeCycleException("Failed to invoke On-Configured Lifecycle methods of " + implementation, e);
+ }
+ }
+
+ @Override
+ public void verifyCanDelete() {
+ if ( !isDisabled() ) {
+ throw new IllegalStateException(this + " cannot be deleted because it has not been disabled");
+ }
+ }
}