You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/04/10 15:43:55 UTC
[20/62] [abbrv] incubator-nifi git commit: Squashed commit of the
following:
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
index df3c251..09479d5 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/ValidationContextFactory.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller;
import java.util.Map;
+import java.util.Set;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
@@ -24,4 +25,7 @@ import org.apache.nifi.components.ValidationContext;
public interface ValidationContextFactory {
ValidationContext newValidationContext(Map<PropertyDescriptor, String> properties, String annotationData);
+
+ ValidationContext newValidationContext(Set<String> serviceIdentifiersToNotValidate, Map<PropertyDescriptor, String> properties, String annotationData);
+
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java
new file mode 100644
index 0000000..18cfcda
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceInstantiationException.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.exception;
+
+public class ControllerServiceInstantiationException extends RuntimeException {
+
+ private static final long serialVersionUID = -544424320587059277L;
+
+ /**
+ * Constructs a default exception
+ */
+ public ControllerServiceInstantiationException() {
+ super();
+ }
+
+ /**
+ * @param message
+ */
+ public ControllerServiceInstantiationException(String message) {
+ super(message);
+ }
+
+ /**
+ * @param cause
+ */
+ public ControllerServiceInstantiationException(Throwable cause) {
+ super(cause);
+ }
+
+ /**
+ * @param message
+ * @param cause
+ */
+ public ControllerServiceInstantiationException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java
deleted file mode 100644
index 4cdbe54..0000000
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/exception/ControllerServiceNotFoundException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller.exception;
-
-public class ControllerServiceNotFoundException extends RuntimeException {
-
- private static final long serialVersionUID = -544424320587059277L;
-
- /**
- * Constructs a default exception
- */
- public ControllerServiceNotFoundException() {
- super();
- }
-
- /**
- * @param message
- */
- public ControllerServiceNotFoundException(String message) {
- super(message);
- }
-
- /**
- * @param cause
- */
- public ControllerServiceNotFoundException(Throwable cause) {
- super(cause);
- }
-
- /**
- * @param message
- * @param cause
- */
- public ControllerServiceNotFoundException(String message, Throwable cause) {
- super(message, cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
new file mode 100644
index 0000000..bb6f3f7
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/reporting/ReportingTaskProvider.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.reporting;
+
+import java.util.Set;
+import org.apache.nifi.controller.ReportingTaskNode;
+
+/**
+ * A ReportingTaskProvider is responsible for providing management of, and access to, Reporting Tasks
+ */
+public interface ReportingTaskProvider {
+
+ /**
+ * Creates a new instance of a reporting task
+ *
+ * @param type the type (fully qualified class name) of the reporting task to instantiate
+ * @param id the identifier for the Reporting Task
+ * @param firstTimeAdded whether or not this is the first time that the reporting task is being added
+ * to the flow. I.e., this will be true only when the user adds the reporting task to the flow, not when
+ * the flow is being restored after a restart of the software
+ *
+ * @return the ReportingTaskNode that is used to manage the reporting task
+ *
+ * @throws ReportingTaskInstantiationException if unable to create the Reporting Task
+ */
+ ReportingTaskNode createReportingTask(String type, String id, boolean firstTimeAdded) throws ReportingTaskInstantiationException;
+
+ /**
+ * Returns the reporting task that has the given identifier, or <code>null</code> if no reporting task
+ * exists with that ID.
+ *
+ * @param identifier
+ * @return
+ */
+ ReportingTaskNode getReportingTaskNode(String identifier);
+
+ /**
+ * Returns a Set of all Reporting Tasks that exist for this service provider.
+ * @return
+ */
+ Set<ReportingTaskNode> getAllReportingTasks();
+
+ /**
+ * Removes the given reporting task from the flow
+ *
+ * @param reportingTask
+ *
+ * @throws IllegalStateException if the reporting task cannot be removed because it is not stopped, or
+ * if the reporting task is not known in the flow
+ */
+ void removeReportingTask(ReportingTaskNode reportingTask);
+
+ /**
+ * Begins scheduling the reporting task to run and invokes appropriate lifecycle methods
+ * @param reportingTask
+ *
+ * @throws IllegalStateException if the ReportingTask's state is not STOPPED, or if the Reporting Task has active
+ * threads, or if the ReportingTask is not valid
+ */
+ void startReportingTask(ReportingTaskNode reportingTask);
+
+ /**
+ * Stops scheduling the reporting task to run and invokes appropriate lifecycle methods
+ * @param reportingTask
+ *
+ * @throws IllegalStateException if the ReportingTask's state is not RUNNING
+ */
+ void stopReportingTask(ReportingTaskNode reportingTask);
+
+
+ /**
+ * Enables the reporting task to be scheduled to run
+ * @param reportingTask
+ *
+ * @throws IllegalStateException if the ReportingTask's state is not DISABLED
+ */
+ void enableReportingTask(ReportingTaskNode reportingTask);
+
+
+ /**
+ * Disables the ability to schedul the reporting task to run
+ *
+ * @param reportingTask
+ *
+ * @throws IllegalStateException if the ReportingTask's state is not STOPPED, or if the Reporting Task has active
+ * threads
+ */
+ void disableReportingTask(ReportingTaskNode reportingTask);
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index 66bad39..50bf469 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -16,7 +16,8 @@
*/
package org.apache.nifi.controller.service;
-import org.apache.nifi.controller.Availability;
+import java.util.Set;
+
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
@@ -26,22 +27,42 @@ public interface ControllerServiceNode extends ConfiguredComponent {
ControllerService getControllerServiceImplementation();
- Availability getAvailability();
-
- void setAvailability(Availability availability);
-
- boolean isDisabled();
-
- void setDisabled(boolean disabled);
-
+ ControllerServiceState getState();
+ void setState(ControllerServiceState state);
+
ControllerServiceReference getReferences();
void addReference(ConfiguredComponent referringComponent);
void removeReference(ConfiguredComponent referringComponent);
+ void setComments(String comment);
+ String getComments();
+
void verifyCanEnable();
void verifyCanDisable();
+
+ /**
+ * Verifies that this Controller Service can be disabled if the provided set of
+ * services are also disabled. This is introduced because we can have an instance
+ * where A references B, which references C, which references A and we want
+ * to disable service C. In this case, the cycle needs to not cause us to fail,
+ * so we want to verify that C can be disabled if A and B also are.
+ *
+ * @param ignoredReferences
+ */
+ void verifyCanDisable(Set<ControllerServiceNode> ignoredReferences);
+
+ /**
+ * Verifies that this Controller Service can be enabled if the provided set of
+ * services are also enabled. This is introduced because we can have an instance where
+ * A reference B, which references C, which references A and we want to enable
+ * Service A. In this case, the cycle needs to not cause us to fail, so we want to verify
+ * that A can be enabled if A and B also are.
+ * @param ignoredReferences
+ */
+ void verifyCanEnable(Set<ControllerServiceNode> ignoredReferences);
+
void verifyCanDelete();
void verifyCanUpdate();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
index 5f2fc2e..1901fb6 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceProvider.java
@@ -16,6 +16,9 @@
*/
package org.apache.nifi.controller.service;
+import java.util.Collection;
+import java.util.Set;
+
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.controller.ControllerServiceLookup;
@@ -25,7 +28,7 @@ import org.apache.nifi.controller.ControllerServiceLookup;
public interface ControllerServiceProvider extends ControllerServiceLookup {
/**
- * Creates a new Controller Service of the given type and assigns it the given id. If <code>firstTimeadded</code>
+ * Creates a new Controller Service of the specified type and assigns it the given id. If <code>firstTimeadded</code>
* is true, calls any methods that are annotated with {@link OnAdded}
*
* @param type
@@ -61,9 +64,87 @@ public interface ControllerServiceProvider extends ControllerServiceLookup {
void enableControllerService(ControllerServiceNode serviceNode);
/**
+ * Enables the collection of services. If a service in this collection depends on another service,
+ * the service being depended on must either already be enabled or must be in the collection as well.
+ * @param serviceNodes
+ */
+ void enableControllerServices(Collection<ControllerServiceNode> serviceNodes);
+
+ /**
* Disables the given controller service so that it cannot be used by other components. This allows
* configuration to be updated or allows service to be removed.
* @param serviceNode
*/
void disableControllerService(ControllerServiceNode serviceNode);
+
+ /**
+ * Returns a Set of all Controller Services that exist for this service provider.
+ * @return
+ */
+ Set<ControllerServiceNode> getAllControllerServices();
+
+ /**
+ * Verifies that all running Processors and Reporting Tasks referencing the Controller Service (or a service
+ * that depends on the provided service) can be stopped.
+ * @param serviceNode
+ *
+ * @throws IllegalStateException if any referencing component cannot be stopped
+ */
+ void verifyCanStopReferencingComponents(ControllerServiceNode serviceNode);
+
+ /**
+ * Recursively unschedules all schedulable components (Processors and Reporting Tasks) that reference the given
+ * Controller Service. For any Controller services that reference this one, its schedulable referencing components will also
+ * be unscheduled.
+ * @param serviceNode
+ */
+ void unscheduleReferencingComponents(ControllerServiceNode serviceNode);
+
+ /**
+ * Verifies that all Controller Services referencing the provided Controller Service can be disabled.
+ * @param serviceNode
+ *
+ * @throws IllegalStateException if any referencing service cannot be disabled
+ */
+ void verifyCanDisableReferencingServices(ControllerServiceNode serviceNode);
+
+ /**
+ * Disables any Controller Service that references the provided Controller Service. This action is performed recursively
+ * so that if service A references B and B references C, disabling references for C will first disable A, then B.
+ * @param serviceNode
+ */
+ void disableReferencingServices(ControllerServiceNode serviceNode);
+
+ /**
+ * Verifies that all Controller Services referencing the provided ControllerService can be enabled.
+ * @param serviceNode
+ *
+ * @throws IllegalStateException if any referencing component cannot be enabled
+ */
+ void verifyCanEnableReferencingServices(ControllerServiceNode serviceNode);
+
+
+ /**
+ * Enables all Controller Services that are referencing the given service. If Service A references Service B and Service
+ * B references serviceNode, Service A and B will both be enabled.
+ * @param serviceNode
+ */
+ void enableReferencingServices(ControllerServiceNode serviceNode);
+
+ /**
+ * Verifies that all enabled Processors referencing the ControllerService (or a service that depends on
+ * the provided service) can be scheduled to run.
+ * @param serviceNode
+ *
+ * @throws IllegalStateException if any referencing component cannot be scheduled
+ */
+ void verifyCanScheduleReferencingComponents(ControllerServiceNode serviceNode);
+
+ /**
+ * Schedules any schedulable component (Processor, ReportingTask) that is referencing the given Controller Service
+ * to run. This is performed recursively, so if a Processor is referencing Service A, which is referencing serviceNode,
+ * then the Processor will also be started.
+ * @param serviceNode
+ */
+ void scheduleReferencingComponents(ControllerServiceNode serviceNode);
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
index 5cb676f..67ffb6c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceReference.java
@@ -41,10 +41,11 @@ public interface ControllerServiceReference {
Set<ConfiguredComponent> getReferencingComponents();
/**
- * Returns a {@link Set} of all Processors and Reporting Tasks that are
- * referencing the Controller Service and are running, in addition to all
+ * Returns a {@link Set} of all Processors, Reporting Tasks, and Controller Services that are
+ * referencing the Controller Service and are running (in the case of Processors and Reporting Tasks)
+ * or enabled (in the case of Controller Services)
*
* @return
*/
- Set<ConfiguredComponent> getRunningReferences();
+ Set<ConfiguredComponent> getActiveReferences();
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java
new file mode 100644
index 0000000..2ed8fd9
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceState.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.service;
+
+
+/**
+ * Represents the valid states for a Controller Service.
+ */
+public enum ControllerServiceState {
+ /**
+ * Controller Service is disabled and cannot be used.
+ */
+ DISABLED,
+
+ /**
+ * Controller Service has been disabled but has not yet finished its lifecycle
+ * methods.
+ */
+ DISABLING,
+
+ /**
+ * Controller Service has been enabled but has not yet finished its lifecycle methods.
+ */
+ ENABLING,
+
+ /**
+ * Controller Service has been enabled and has finished its lifecycle methods. The Controller SErvice
+ * is ready to be used.
+ */
+ ENABLED;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 06ef203..f3fb67c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -52,6 +52,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnRemoved;
+import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.cluster.BulletinsPayload;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.DataFlow;
@@ -62,6 +63,7 @@ import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
@@ -77,6 +79,8 @@ import org.apache.nifi.controller.exception.ProcessorLifeCycleException;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.label.StandardLabel;
import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.reporting.ReportingTaskProvider;
+import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
import org.apache.nifi.controller.reporting.StandardReportingTaskNode;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.CounterRepository;
@@ -103,6 +107,7 @@ import org.apache.nifi.controller.scheduling.StandardProcessScheduler;
import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
+import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.PortStatus;
@@ -129,6 +134,7 @@ import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.groups.StandardProcessGroup;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
@@ -161,6 +167,8 @@ import org.apache.nifi.remote.protocol.socket.SocketFlowFileServerProtocol;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.EventAccess;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -170,6 +178,7 @@ import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.ReflectionUtils;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
@@ -189,7 +198,7 @@ import org.slf4j.LoggerFactory;
import com.sun.jersey.api.client.ClientHandlerException;
-public class FlowController implements EventAccess, ControllerServiceProvider, Heartbeater, QueueProvider {
+public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, Heartbeater, QueueProvider {
// default repository implementations
public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
@@ -374,7 +383,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
this.properties = properties;
sslContext = SslContextFactory.createSslContext(properties, false);
extensionManager = new ExtensionManager();
- controllerServiceProvider = new StandardControllerServiceProvider();
timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process"));
eventDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxEventDrivenThreads.get(), "Event-Driven Process"));
@@ -398,6 +406,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
processScheduler = new StandardProcessScheduler(this, this, encryptor);
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
+ controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository);
final ProcessContextFactory contextFactory = new ProcessContextFactory(contentRepository, flowFileRepository, flowFileEventRepository, counterRepositoryRef.get(), provenanceEventRepository);
processScheduler.setSchedulingAgent(SchedulingStrategy.EVENT_DRIVEN, new EventDrivenSchedulingAgent(
@@ -593,7 +602,10 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
startConnectable(connectable);
}
} catch (final Throwable t) {
- LOG.error("Unable to start {} due to {}", new Object[]{connectable, t});
+ LOG.error("Unable to start {} due to {}", new Object[]{connectable, t.toString()});
+ if ( LOG.isDebugEnabled() ) {
+ LOG.error("", t);
+ }
}
}
@@ -1063,7 +1075,23 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
// Trigger any processors' methods marked with @OnShutdown to be called
rootGroup.shutdown();
-
+
+ // invoke any methods annotated with @OnShutdown on Controller Services
+ for ( final ControllerServiceNode serviceNode : getAllControllerServices() ) {
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ final ConfigurationContext configContext = new StandardConfigurationContext(serviceNode, controllerServiceProvider);
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, serviceNode.getControllerServiceImplementation(), configContext);
+ }
+ }
+
+ // invoke any methods annotated with @OnShutdown on Reporting Tasks
+ for ( final ReportingTaskNode taskNode : getAllReportingTasks() ) {
+ final ConfigurationContext configContext = taskNode.getConfigurationContext();
+ try (final NarCloseable narCloseable = NarCloseable.withNarLoader()) {
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnShutdown.class, taskNode.getReportingTask(), configContext);
+ }
+ }
+
try {
this.timerDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
this.eventDrivenEngineRef.get().awaitTermination(gracefulShutdownSeconds / 2, TimeUnit.SECONDS);
@@ -1402,6 +1430,30 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
validateSnippetContents(requireNonNull(group), dto);
//
+ // Instantiate Controller Services
+ //
+ for ( final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices() ) {
+ final ControllerServiceNode serviceNode = createControllerService(controllerServiceDTO.getType(), controllerServiceDTO.getId(), true);
+
+ serviceNode.setAnnotationData(controllerServiceDTO.getAnnotationData());
+ serviceNode.setComments(controllerServiceDTO.getComments());
+ serviceNode.setName(controllerServiceDTO.getName());
+ }
+
+ // configure controller services. We do this after creating all of them in case 1 service
+ // references another service.
+ for ( final ControllerServiceDTO controllerServiceDTO : dto.getControllerServices() ) {
+ final String serviceId = controllerServiceDTO.getId();
+ final ControllerServiceNode serviceNode = getControllerServiceNode(serviceId);
+
+ for ( final Map.Entry<String, String> entry : controllerServiceDTO.getProperties().entrySet() ) {
+ if ( entry.getValue() != null ) {
+ serviceNode.setProperty(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ //
// Instantiate the labels
//
for (final LabelDTO labelDTO : dto.getLabels()) {
@@ -1411,7 +1463,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
label.setSize(new Size(labelDTO.getWidth(), labelDTO.getHeight()));
}
- // TODO: Update the label's "style"
+ label.setStyle(labelDTO.getStyle());
group.addLabel(label);
}
@@ -1737,14 +1789,18 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
}
// validate that all Processor Types and Prioritizer Types are valid
- final List<String> processorClasses = new ArrayList<>();
+ final Set<String> processorClasses = new HashSet<>();
for (final Class<?> c : ExtensionManager.getExtensions(Processor.class)) {
processorClasses.add(c.getName());
}
- final List<String> prioritizerClasses = new ArrayList<>();
+ final Set<String> prioritizerClasses = new HashSet<>();
for (final Class<?> c : ExtensionManager.getExtensions(FlowFilePrioritizer.class)) {
prioritizerClasses.add(c.getName());
}
+ final Set<String> controllerServiceClasses = new HashSet<>();
+ for (final Class<?> c : ExtensionManager.getExtensions(ControllerService.class)) {
+ controllerServiceClasses.add(c.getName());
+ }
final Set<ProcessorDTO> allProcs = new HashSet<>();
final Set<ConnectionDTO> allConns = new HashSet<>();
@@ -1760,6 +1816,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
throw new IllegalStateException("Invalid Processor Type: " + proc.getType());
}
}
+
+ final Set<ControllerServiceDTO> controllerServices = templateContents.getControllerServices();
+ if (controllerServices != null) {
+ for (final ControllerServiceDTO service : controllerServices) {
+ if (!controllerServiceClasses.contains(service.getType())) {
+ throw new IllegalStateException("Invalid Controller Service Type: " + service.getType());
+ }
+ }
+ }
for (final ConnectionDTO conn : allConns) {
final List<String> prioritizers = conn.getPrioritizers();
@@ -2480,17 +2545,20 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
lookupGroup(groupId).stopProcessing();
}
- public ReportingTaskNode createReportingTask(final String type, String id) throws ReportingTaskInstantiationException {
- return createReportingTask(type, id, true);
+ public ReportingTaskNode createReportingTask(final String type) throws ReportingTaskInstantiationException {
+ return createReportingTask(type, true);
+ }
+
+ public ReportingTaskNode createReportingTask(final String type, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
+ return createReportingTask(type, UUID.randomUUID().toString(), firstTimeAdded);
}
- public ReportingTaskNode createReportingTask(final String type, String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
- if (type == null) {
+ @Override
+ public ReportingTaskNode createReportingTask(final String type, final String id, final boolean firstTimeAdded) throws ReportingTaskInstantiationException {
+ if (type == null || id == null) {
throw new NullPointerException();
}
-
- id = requireNonNull(id).intern();
-
+
ReportingTask task = null;
final ClassLoader ctxClassLoader = Thread.currentThread().getContextClassLoader();
try {
@@ -2516,8 +2584,19 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
final ValidationContextFactory validationContextFactory = new StandardValidationContextFactory(controllerServiceProvider);
final ReportingTaskNode taskNode = new StandardReportingTaskNode(task, id, this, processScheduler, validationContextFactory);
+ taskNode.setName(task.getClass().getSimpleName());
if ( firstTimeAdded ) {
+ final ComponentLog componentLog = new SimpleProcessLogger(id, taskNode.getReportingTask());
+ final ReportingInitializationContext config = new StandardReportingInitializationContext(id, taskNode.getName(),
+ SchedulingStrategy.TIMER_DRIVEN, "1 min", componentLog, this);
+
+ try {
+ task.initialize(config);
+ } catch (final InitializationException ie) {
+ throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + type, ie);
+ }
+
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, task);
} catch (final Exception e) {
@@ -2529,30 +2608,33 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
return taskNode;
}
+ @Override
public ReportingTaskNode getReportingTaskNode(final String taskId) {
return reportingTasks.get(taskId);
}
+ @Override
public void startReportingTask(final ReportingTaskNode reportingTaskNode) {
if (isTerminated()) {
throw new IllegalStateException("Cannot start reporting task " + reportingTaskNode + " because the controller is terminated");
}
reportingTaskNode.verifyCanStart();
-
- processScheduler.schedule(reportingTaskNode);
+ processScheduler.schedule(reportingTaskNode);
}
+
+ @Override
public void stopReportingTask(final ReportingTaskNode reportingTaskNode) {
if (isTerminated()) {
return;
}
reportingTaskNode.verifyCanStop();
-
processScheduler.unschedule(reportingTaskNode);
}
+ @Override
public void removeReportingTask(final ReportingTaskNode reportingTaskNode) {
final ReportingTaskNode existing = reportingTasks.get(reportingTaskNode.getIdentifier());
if ( existing == null || existing != reportingTaskNode ) {
@@ -2565,44 +2647,101 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, reportingTaskNode.getReportingTask(), reportingTaskNode.getConfigurationContext());
}
+ for ( final Map.Entry<PropertyDescriptor, String> entry : reportingTaskNode.getProperties().entrySet() ) {
+ final PropertyDescriptor descriptor = entry.getKey();
+ if (descriptor.getControllerServiceDefinition() != null ) {
+ final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
+ if ( value != null ) {
+ final ControllerServiceNode serviceNode = controllerServiceProvider.getControllerServiceNode(value);
+ if ( serviceNode != null ) {
+ serviceNode.removeReference(reportingTaskNode);
+ }
+ }
+ }
+ }
+
reportingTasks.remove(reportingTaskNode.getIdentifier());
}
- Collection<ReportingTaskNode> getReportingTasks() {
- return reportingTasks.values();
+ @Override
+ public Set<ReportingTaskNode> getAllReportingTasks() {
+ return new HashSet<>(reportingTasks.values());
}
-
+ @Override
+ public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
+ return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+ }
+
+ @Override
public void enableReportingTask(final ReportingTaskNode reportingTaskNode) {
reportingTaskNode.verifyCanEnable();
-
processScheduler.enableReportingTask(reportingTaskNode);
}
+ @Override
public void disableReportingTask(final ReportingTaskNode reportingTaskNode) {
reportingTaskNode.verifyCanDisable();
-
processScheduler.disableReportingTask(reportingTaskNode);
}
@Override
+ public void disableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.disableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void enableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.enableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.scheduleReferencingComponents(serviceNode);
+ }
+
+ @Override
+ public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.unscheduleReferencingComponents(serviceNode);
+ }
+
+ @Override
public void enableControllerService(final ControllerServiceNode serviceNode) {
- serviceNode.verifyCanEnable();
controllerServiceProvider.enableControllerService(serviceNode);
}
@Override
+ public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
+ controllerServiceProvider.enableControllerServices(serviceNodes);
+ }
+
+ @Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanDisable();
controllerServiceProvider.disableControllerService(serviceNode);
}
-
+
@Override
- public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
- return controllerServiceProvider.createControllerService(type, id.intern(), firstTimeAdded);
+ public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanEnableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanScheduleReferencingComponents(serviceNode);
}
@Override
+ public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanDisableReferencingServices(serviceNode);
+ }
+
+ @Override
+ public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
+ controllerServiceProvider.verifyCanStopReferencingComponents(serviceNode);
+ }
+
+ @Override
public ControllerService getControllerService(final String serviceIdentifier) {
return controllerServiceProvider.getControllerService(serviceIdentifier);
}
@@ -2623,10 +2762,24 @@ public class FlowController implements EventAccess, ControllerServiceProvider, H
}
@Override
+ public boolean isControllerServiceEnabling(final String serviceIdentifier) {
+ return controllerServiceProvider.isControllerServiceEnabling(serviceIdentifier);
+ }
+
+ @Override
+ public String getControllerServiceName(final String serviceIdentifier) {
+ return controllerServiceProvider.getControllerServiceName(serviceIdentifier);
+ }
+
public void removeControllerService(final ControllerServiceNode serviceNode) {
controllerServiceProvider.removeControllerService(serviceNode);
}
+ @Override
+ public Set<ControllerServiceNode> getAllControllerServices() {
+ return controllerServiceProvider.getAllControllerServices();
+ }
+
//
// Counters
//
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
index c67181a..85ad159 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowFromDOMFactory.java
@@ -26,6 +26,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.connectable.Size;
+import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.remote.StandardRemoteProcessGroupPortDescriptor;
@@ -33,6 +34,7 @@ import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.util.DomUtils;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
@@ -42,7 +44,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
@@ -77,6 +79,40 @@ public class FlowFromDOMFactory {
return styles;
}
+
+ public static ControllerServiceDTO getControllerService(final Element element, final StringEncryptor encryptor) {
+ final ControllerServiceDTO dto = new ControllerServiceDTO();
+
+ dto.setId(getString(element, "id"));
+ dto.setName(getString(element, "name"));
+ dto.setComments(getString(element, "comment"));
+ dto.setType(getString(element, "class"));
+
+ final boolean enabled = getBoolean(element, "enabled");
+ dto.setState(enabled ? ControllerServiceState.ENABLED.name() : ControllerServiceState.DISABLED.name());
+
+ dto.setProperties(getProperties(element, encryptor));
+ dto.setAnnotationData(getString(element, "annotationData"));
+
+ return dto;
+ }
+
+ public static ReportingTaskDTO getReportingTask(final Element element, final StringEncryptor encryptor) {
+ final ReportingTaskDTO dto = new ReportingTaskDTO();
+
+ dto.setId(getString(element, "id"));
+ dto.setName(getString(element, "name"));
+ dto.setComments(getString(element, "comment"));
+ dto.setType(getString(element, "class"));
+ dto.setSchedulingPeriod(getString(element, "schedulingPeriod"));
+ dto.setState(getString(element, "scheduledState"));
+ dto.setSchedulingStrategy(getString(element, "schedulingStrategy"));
+
+ dto.setProperties(getProperties(element, encryptor));
+ dto.setAnnotationData(getString(element, "annotationData"));
+
+ return dto;
+ }
public static ProcessGroupDTO getProcessGroup(final String parentId, final Element element, final StringEncryptor encryptor) {
final ProcessGroupDTO dto = new ProcessGroupDTO();
@@ -310,7 +346,6 @@ public class FlowFromDOMFactory {
final ProcessorConfigDTO configDto = new ProcessorConfigDTO();
dto.setConfig(configDto);
configDto.setComments(getString(element, "comment"));
- configDto.setAnnotationData(getString(element, "annotationData"));
configDto.setConcurrentlySchedulableTaskCount(getInt(element, "maxConcurrentTasks"));
final String schedulingPeriod = getString(element, "schedulingPeriod");
configDto.setSchedulingPeriod(schedulingPeriod);
@@ -334,14 +369,8 @@ public class FlowFromDOMFactory {
configDto.setRunDurationMillis(TimeUnit.NANOSECONDS.toMillis(runDurationNanos));
}
- final LinkedHashMap<String, String> properties = new LinkedHashMap<>();
- final List<Element> propertyNodeList = getChildrenByTagName(element, "property");
- for (final Element propertyElement : propertyNodeList) {
- final String name = getString(propertyElement, "name");
- final String value = decrypt(getString(propertyElement, "value"), encryptor);
- properties.put(name, value);
- }
- configDto.setProperties(properties);
+ configDto.setProperties(getProperties(element, encryptor));
+ configDto.setAnnotationData(getString(element, "annotationData"));
final Set<String> autoTerminatedRelationships = new HashSet<>();
final List<Element> autoTerminateList = getChildrenByTagName(element, "autoTerminatedRelationship");
@@ -353,6 +382,17 @@ public class FlowFromDOMFactory {
return dto;
}
+ private static LinkedHashMap<String, String> getProperties(final Element element, final StringEncryptor encryptor) {
+ final LinkedHashMap<String, String> properties = new LinkedHashMap<>();
+ final List<Element> propertyNodeList = getChildrenByTagName(element, "property");
+ for (final Element propertyElement : propertyNodeList) {
+ final String name = getString(propertyElement, "name");
+ final String value = decrypt(getString(propertyElement, "value"), encryptor);
+ properties.put(name, value);
+ }
+ return properties;
+ }
+
private static String getString(final Element element, final String childElementName) {
final List<Element> nodeList = getChildrenByTagName(element, childElementName);
if (nodeList == null || nodeList.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
index e08a94d..7cd9d3b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSerializer.java
@@ -40,6 +40,8 @@ import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.groups.ProcessGroup;
@@ -47,7 +49,6 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
-
import org.w3c.dom.DOMException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
@@ -79,6 +80,18 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
addProcessGroup(rootNode, controller.getGroup(controller.getRootGroupId()), "rootGroup");
+
+ final Element controllerServicesNode = doc.createElement("controllerServices");
+ rootNode.appendChild(controllerServicesNode);
+ for ( final ControllerServiceNode serviceNode : controller.getAllControllerServices() ) {
+ addControllerService(controllerServicesNode, serviceNode, encryptor);
+ }
+
+ final Element reportingTasksNode = doc.createElement("reportingTasks");
+ rootNode.appendChild(reportingTasksNode);
+ for ( final ReportingTaskNode taskNode : controller.getAllReportingTasks() ) {
+ addReportingTask(reportingTasksNode, taskNode, encryptor);
+ }
final DOMSource domSource = new DOMSource(doc);
final StreamResult streamResult = new StreamResult(new BufferedOutputStream(os));
@@ -300,8 +313,16 @@ public class StandardFlowSerializer implements FlowSerializer {
addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
- // properties.
- for (final Map.Entry<PropertyDescriptor, String> entry : processor.getProperties().entrySet()) {
+ addConfiguration(element, processor.getProperties(), processor.getAnnotationData(), encryptor);
+
+ for (final Relationship rel : processor.getAutoTerminatedRelationships()) {
+ addTextElement(element, "autoTerminatedRelationship", rel.getName());
+ }
+ }
+
+ private static void addConfiguration(final Element element, final Map<PropertyDescriptor, String> properties, final String annotationData, final StringEncryptor encryptor) {
+ final Document doc = element.getOwnerDocument();
+ for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
String value = entry.getValue();
@@ -322,14 +343,9 @@ public class StandardFlowSerializer implements FlowSerializer {
element.appendChild(propElement);
}
- final String annotationData = processor.getAnnotationData();
if (annotationData != null) {
addTextElement(element, "annotationData", annotationData);
}
-
- for (final Relationship rel : processor.getAutoTerminatedRelationships()) {
- addTextElement(element, "autoTerminatedRelationship", rel.getName());
- }
}
private void addConnection(final Element parentElement, final Connection connection) {
@@ -390,11 +406,43 @@ public class StandardFlowSerializer implements FlowSerializer {
parentElement.appendChild(element);
}
- private void addTextElement(final Element element, final String name, final long value) {
+
+ public static void addControllerService(final Element element, final ControllerServiceNode serviceNode, final StringEncryptor encryptor) {
+ final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
+ addTextElement(serviceElement, "id", serviceNode.getIdentifier());
+ addTextElement(serviceElement, "name", serviceNode.getName());
+ addTextElement(serviceElement, "comment", serviceNode.getComments());
+ addTextElement(serviceElement, "class", serviceNode.getControllerServiceImplementation().getClass().getCanonicalName());
+
+ final ControllerServiceState state = serviceNode.getState();
+ final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING);
+ addTextElement(serviceElement, "enabled", String.valueOf(enabled));
+
+ addConfiguration(serviceElement, serviceNode.getProperties(), serviceNode.getAnnotationData(), encryptor);
+
+ element.appendChild(serviceElement);
+ }
+
+ public static void addReportingTask(final Element element, final ReportingTaskNode taskNode, final StringEncryptor encryptor) {
+ final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
+ addTextElement(taskElement, "id", taskNode.getIdentifier());
+ addTextElement(taskElement, "name", taskNode.getName());
+ addTextElement(taskElement, "comment", taskNode.getComments());
+ addTextElement(taskElement, "class", taskNode.getReportingTask().getClass().getCanonicalName());
+ addTextElement(taskElement, "schedulingPeriod", taskNode.getSchedulingPeriod());
+ addTextElement(taskElement, "scheduledState", taskNode.getScheduledState().name());
+ addTextElement(taskElement, "schedulingStrategy", taskNode.getSchedulingStrategy().name());
+
+ addConfiguration(taskElement, taskNode.getProperties(), taskNode.getAnnotationData(), encryptor);
+
+ element.appendChild(taskElement);
+ }
+
+ private static void addTextElement(final Element element, final String name, final long value) {
addTextElement(element, name, String.valueOf(value));
}
- private void addTextElement(final Element element, final String name, final String value) {
+ private static void addTextElement(final Element element, final String name, final String value) {
final Document doc = element.getOwnerDocument();
final Element toAdd = doc.createElement(name);
toAdd.setTextContent(value);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
index 64ce5c4..fcfee83 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowService.java
@@ -81,8 +81,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
private final FlowController controller;
private final Path flowXml;
- private final Path taskConfigXml;
- private final Path serviceConfigXml;
private final FlowConfigurationDAO dao;
private final int gracefulShutdownSeconds;
private final boolean autoResumeState;
@@ -154,14 +152,12 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
this.controller = controller;
this.encryptor = encryptor;
flowXml = Paths.get(properties.getProperty(NiFiProperties.FLOW_CONFIGURATION_FILE));
- taskConfigXml = Paths.get(properties.getProperty(NiFiProperties.TASK_CONFIGURATION_FILE));
- serviceConfigXml = Paths.get(properties.getProperty(NiFiProperties.SERVICE_CONFIGURATION_FILE));
gracefulShutdownSeconds = (int) FormatUtils.getTimeDuration(properties.getProperty(NiFiProperties.FLOW_CONTROLLER_GRACEFUL_SHUTDOWN_PERIOD), TimeUnit.SECONDS);
autoResumeState = properties.getAutoResumeState();
connectionRetryMillis = (int) FormatUtils.getTimeDuration(properties.getClusterManagerFlowRetrievalDelay(), TimeUnit.MILLISECONDS);
- dao = new StandardXMLFlowConfigurationDAO(flowXml, taskConfigXml, serviceConfigXml, encryptor);
+ dao = new StandardXMLFlowConfigurationDAO(flowXml, encryptor);
if (configuredForClustering) {
@@ -605,7 +601,6 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
if (firstControllerInitialization) {
// load the controller services
logger.debug("Loading controller services");
- dao.loadControllerServices(controller);
}
// load the flow
@@ -622,7 +617,7 @@ public class StandardFlowService implements FlowService, ProtocolHandler {
logger.debug("First controller initialization. Loading reporting tasks and initializing controller.");
// load the controller tasks
- dao.loadReportingTasks(controller);
+// dao.loadReportingTasks(controller);
// initialize the flow
controller.initializeFlow();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
index 05a8f01..201482c 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowSynchronizer.java
@@ -26,6 +26,7 @@ import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -40,6 +41,7 @@ import javax.xml.parsers.ParserConfigurationException;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.connectable.Connectable;
@@ -51,23 +53,35 @@ import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.exception.ProcessorInstantiationException;
import org.apache.nifi.controller.label.Label;
+import org.apache.nifi.controller.reporting.ReportingTaskInstantiationException;
+import org.apache.nifi.controller.reporting.StandardReportingInitializationContext;
+import org.apache.nifi.controller.service.ControllerServiceLoader;
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.events.BulletinFactory;
-import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.fingerprint.FingerprintException;
import org.apache.nifi.fingerprint.FingerprintFactory;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.SimpleProcessLogger;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RootGroupPort;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingInitializationContext;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
+import org.apache.nifi.util.DomUtils;
import org.apache.nifi.util.NiFiProperties;
+import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.web.api.dto.ConnectableDTO;
import org.apache.nifi.web.api.dto.ConnectionDTO;
+import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.FlowSnippetDTO;
import org.apache.nifi.web.api.dto.FunnelDTO;
import org.apache.nifi.web.api.dto.LabelDTO;
@@ -77,9 +91,7 @@ import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.ProcessorConfigDTO;
import org.apache.nifi.web.api.dto.ProcessorDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
-
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.nifi.encrypt.StringEncryptor;
+import org.apache.nifi.web.api.dto.ReportingTaskDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
@@ -96,9 +108,11 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
private static final Logger logger = LoggerFactory.getLogger(StandardFlowSynchronizer.class);
public static final URL FLOW_XSD_RESOURCE = StandardFlowSynchronizer.class.getResource("/FlowConfiguration.xsd");
private final StringEncryptor encryptor;
+ private final boolean autoResumeState;
public StandardFlowSynchronizer(final StringEncryptor encryptor) {
this.encryptor = encryptor;
+ autoResumeState = NiFiProperties.getInstance().getAutoResumeState();
}
public static boolean isEmpty(final DataFlow dataFlow, final StringEncryptor encryptor) {
@@ -157,10 +171,26 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
controller.setMaxEventDrivenThreadCount(maxThreadCount / 3);
}
+ final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks");
+ final List<Element> taskElements;
+ if ( reportingTasksElement == null ) {
+ taskElements = Collections.emptyList();
+ } else {
+ taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
+ }
+
+ final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices");
+ final List<Element> controllerServiceElements;
+ if ( controllerServicesElement == null ) {
+ controllerServiceElements = Collections.emptyList();
+ } else {
+ controllerServiceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
+ }
+
logger.trace("Parsing process group from DOM");
final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
final ProcessGroupDTO rootGroupDto = FlowFromDOMFactory.getProcessGroup(null, rootGroupElement, encryptor);
- existingFlowEmpty = isEmpty(rootGroupDto);
+ existingFlowEmpty = taskElements.isEmpty() && controllerServiceElements.isEmpty() && isEmpty(rootGroupDto);
logger.debug("Existing Flow Empty = {}", existingFlowEmpty);
}
}
@@ -200,37 +230,64 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
// create document by parsing proposed flow bytes
logger.trace("Parsing proposed flow bytes as DOM document");
final Document configuration = parseFlowBytes(proposedFlow.getFlow());
-
+
// attempt to sync controller with proposed flow
try {
if (configuration != null) {
- // get the root element
- final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0);
-
- // set controller config
- logger.trace("Updating flow config");
- final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount");
- if (maxThreadCount == null) {
- controller.setMaxTimerDrivenThreadCount(getInt(rootElement, "maxTimerDrivenThreadCount"));
- controller.setMaxEventDrivenThreadCount(getInt(rootElement, "maxEventDrivenThreadCount"));
- } else {
- controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 / 3);
- controller.setMaxEventDrivenThreadCount(maxThreadCount / 3);
- }
-
- // get the root group XML element
- final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
-
- // if this controller isn't initialized or its emtpy, add the root group, otherwise update
- if (!initialized || existingFlowEmpty) {
- logger.trace("Adding root process group");
- addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
- } else {
- logger.trace("Updating root process group");
- updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
+ synchronized (configuration) {
+ // get the root element
+ final Element rootElement = (Element) configuration.getElementsByTagName("flowController").item(0);
+
+ // set controller config
+ logger.trace("Updating flow config");
+ final Integer maxThreadCount = getInteger(rootElement, "maxThreadCount");
+ if (maxThreadCount == null) {
+ controller.setMaxTimerDrivenThreadCount(getInt(rootElement, "maxTimerDrivenThreadCount"));
+ controller.setMaxEventDrivenThreadCount(getInt(rootElement, "maxEventDrivenThreadCount"));
+ } else {
+ controller.setMaxTimerDrivenThreadCount(maxThreadCount * 2 / 3);
+ controller.setMaxEventDrivenThreadCount(maxThreadCount / 3);
+ }
+
+ // get the root group XML element
+ final Element rootGroupElement = (Element) rootElement.getElementsByTagName("rootGroup").item(0);
+
+ final Element controllerServicesElement = (Element) DomUtils.getChild(rootElement, "controllerServices");
+ if ( controllerServicesElement != null ) {
+ final List<Element> serviceElements = DomUtils.getChildElementsByTagName(controllerServicesElement, "controllerService");
+
+ if ( !initialized || existingFlowEmpty ) {
+ ControllerServiceLoader.loadControllerServices(serviceElements, controller, encryptor, controller.getBulletinRepository(), autoResumeState);
+ } else {
+ for ( final Element serviceElement : serviceElements ) {
+ updateControllerService(controller, serviceElement, encryptor);
+ }
+ }
+ }
+
+ // if this controller isn't initialized or its emtpy, add the root group, otherwise update
+ if (!initialized || existingFlowEmpty) {
+ logger.trace("Adding root process group");
+ addProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
+ } else {
+ logger.trace("Updating root process group");
+ updateProcessGroup(controller, /* parent group */ null, rootGroupElement, encryptor);
+ }
+
+ final Element reportingTasksElement = (Element) DomUtils.getChild(rootElement, "reportingTasks");
+ if ( reportingTasksElement != null ) {
+ final List<Element> taskElements = DomUtils.getChildElementsByTagName(reportingTasksElement, "reportingTask");
+ for ( final Element taskElement : taskElements ) {
+ if ( !initialized || existingFlowEmpty ) {
+ addReportingTask(controller, taskElement, encryptor);
+ } else {
+ updateReportingTask(controller, taskElement, encryptor);
+ }
+ }
+ }
}
}
-
+
logger.trace("Synching templates");
if ((existingTemplates == null || existingTemplates.length == 0) && proposedFlow.getTemplates() != null && proposedFlow.getTemplates().length > 0) {
// need to load templates
@@ -313,7 +370,124 @@ public class StandardFlowSynchronizer implements FlowSynchronizer {
return baos.toByteArray();
}
+
+
+ private void updateControllerService(final FlowController controller, final Element controllerServiceElement, final StringEncryptor encryptor) {
+ final ControllerServiceDTO dto = FlowFromDOMFactory.getControllerService(controllerServiceElement, encryptor);
+
+ final ControllerServiceState dtoState = ControllerServiceState.valueOf(dto.getState());
+ final boolean dtoEnabled = (dtoState == ControllerServiceState.ENABLED || dtoState == ControllerServiceState.ENABLING);
+
+ final ControllerServiceNode serviceNode = controller.getControllerServiceNode(dto.getId());
+ final ControllerServiceState serviceState = serviceNode.getState();
+ final boolean serviceEnabled = (serviceState == ControllerServiceState.ENABLED || serviceState == ControllerServiceState.ENABLING);
+
+ if (dtoEnabled && !serviceEnabled) {
+ controller.enableControllerService(controller.getControllerServiceNode(dto.getId()));
+ } else if (!dtoEnabled && serviceEnabled) {
+ controller.disableControllerService(controller.getControllerServiceNode(dto.getId()));
+ }
+ }
+
+ private void addReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) throws ReportingTaskInstantiationException {
+ final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
+
+ final ReportingTaskNode reportingTask = controller.createReportingTask(dto.getType(), dto.getId(), false);
+ reportingTask.setName(dto.getName());
+ reportingTask.setComments(dto.getComments());
+ reportingTask.setScheduldingPeriod(dto.getSchedulingPeriod());
+ reportingTask.setSchedulingStrategy(SchedulingStrategy.valueOf(dto.getSchedulingStrategy()));
+
+ reportingTask.setAnnotationData(dto.getAnnotationData());
+
+ for (final Map.Entry<String, String> entry : dto.getProperties().entrySet()) {
+ if (entry.getValue() == null) {
+ reportingTask.removeProperty(entry.getKey());
+ } else {
+ reportingTask.setProperty(entry.getKey(), entry.getValue());
+ }
+ }
+
+ final ComponentLog componentLog = new SimpleProcessLogger(dto.getId(), reportingTask.getReportingTask());
+ final ReportingInitializationContext config = new StandardReportingInitializationContext(dto.getId(), dto.getName(),
+ SchedulingStrategy.valueOf(dto.getSchedulingStrategy()), dto.getSchedulingPeriod(), componentLog, controller);
+
+ try {
+ reportingTask.getReportingTask().initialize(config);
+ } catch (final InitializationException ie) {
+ throw new ReportingTaskInstantiationException("Failed to initialize reporting task of type " + dto.getType(), ie);
+ }
+
+ if ( autoResumeState ) {
+ if ( ScheduledState.RUNNING.name().equals(dto.getState()) ) {
+ try {
+ controller.startReportingTask(reportingTask);
+ } catch (final Exception e) {
+ logger.error("Failed to start {} due to {}", reportingTask, e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+ controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+ "Reporting Tasks", Severity.ERROR.name(), "Failed to start " + reportingTask + " due to " + e));
+ }
+ } else if ( ScheduledState.DISABLED.name().equals(dto.getState()) ) {
+ try {
+ controller.disableReportingTask(reportingTask);
+ } catch (final Exception e) {
+ logger.error("Failed to mark {} as disabled due to {}", reportingTask, e);
+ if ( logger.isDebugEnabled() ) {
+ logger.error("", e);
+ }
+ controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin(
+ "Reporting Tasks", Severity.ERROR.name(), "Failed to mark " + reportingTask + " as disabled due to " + e));
+ }
+ }
+ }
+ }
+ private void updateReportingTask(final FlowController controller, final Element reportingTaskElement, final StringEncryptor encryptor) {
+ final ReportingTaskDTO dto = FlowFromDOMFactory.getReportingTask(reportingTaskElement, encryptor);
+ final ReportingTaskNode taskNode = controller.getReportingTaskNode(dto.getId());
+
+ if (!taskNode.getScheduledState().name().equals(dto.getState())) {
+ try {
+ switch (ScheduledState.valueOf(dto.getState())) {
+ case DISABLED:
+ if ( taskNode.isRunning() ) {
+ controller.stopReportingTask(taskNode);
+ }
+ controller.disableReportingTask(taskNode);
+ break;
+ case RUNNING:
+ if ( taskNode.getScheduledState() == ScheduledState.DISABLED ) {
+ controller.enableReportingTask(taskNode);
+ }
+ controller.startReportingTask(taskNode);
+ break;
+ case STOPPED:
+ if (taskNode.getScheduledState() == ScheduledState.DISABLED) {
+ controller.enableReportingTask(taskNode);
+ } else if (taskNode.getScheduledState() == ScheduledState.RUNNING) {
+ controller.stopReportingTask(taskNode);
+ }
+ break;
+ }
+ } catch (final IllegalStateException ise) {
+ logger.error("Failed to change Scheduled State of {} from {} to {} due to {}", taskNode, taskNode.getScheduledState().name(), dto.getState(), ise.toString());
+ logger.error("", ise);
+
+ // create bulletin for the Processor Node
+ controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Node Reconnection", Severity.ERROR.name(),
+ "Failed to change Scheduled State of " + taskNode + " from " + taskNode.getScheduledState().name() + " to " + dto.getState() + " due to " + ise.toString()));
+
+ // create bulletin at Controller level.
+ controller.getBulletinRepository().addBulletin(BulletinFactory.createBulletin("Node Reconnection", Severity.ERROR.name(),
+ "Failed to change Scheduled State of " + taskNode + " from " + taskNode.getScheduledState().name() + " to " + dto.getState() + " due to " + ise.toString()));
+ }
+ }
+ }
+
+
private ProcessGroup updateProcessGroup(final FlowController controller, final ProcessGroup parentGroup, final Element processGroupElement, final StringEncryptor encryptor) throws ProcessorInstantiationException {
// get the parent group ID
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e9647717/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
index fe72ae4..355e303 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardProcessorNode.java
@@ -52,6 +52,7 @@ import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Position;
+import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.logging.LogLevel;
@@ -120,7 +121,7 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
private SchedulingStrategy schedulingStrategy; // guarded by read/write lock
@SuppressWarnings("deprecation")
- StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory,
+ public StandardProcessorNode(final Processor processor, final String uuid, final ValidationContextFactory validationContextFactory,
final ProcessScheduler scheduler, final ControllerServiceProvider controllerServiceProvider) {
super(processor, uuid, validationContextFactory, controllerServiceProvider);
@@ -985,6 +986,16 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
readLock.unlock();
}
}
+
+ @Override
+ public int getActiveThreadCount() {
+ readLock.lock();
+ try {
+ return processScheduler.getActiveThreadCount(this);
+ } finally {
+ readLock.unlock();
+ }
+ }
@Override
public boolean isValid() {
@@ -1182,8 +1193,13 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
public void verifyCanStart() {
readLock.lock();
try {
- if (scheduledState.get() != ScheduledState.STOPPED) {
- throw new IllegalStateException(this + " is not stopped");
+ switch (getScheduledState()) {
+ case DISABLED:
+ throw new IllegalStateException(this + " cannot be started because it is disabled");
+ case RUNNING:
+ throw new IllegalStateException(this + " cannot be started because it is already running");
+ case STOPPED:
+ break;
}
verifyNoActiveThreads();
@@ -1194,6 +1210,31 @@ public class StandardProcessorNode extends ProcessorNode implements Connectable
readLock.unlock();
}
}
+
+ @Override
+ public void verifyCanStart(final Set<ControllerServiceNode> ignoredReferences) {
+ switch (getScheduledState()) {
+ case DISABLED:
+ throw new IllegalStateException(this + " cannot be started because it is disabled");
+ case RUNNING:
+ throw new IllegalStateException(this + " cannot be started because it is already running");
+ case STOPPED:
+ break;
+ }
+ verifyNoActiveThreads();
+
+ final Set<String> ids = new HashSet<>();
+ for ( final ControllerServiceNode node : ignoredReferences ) {
+ ids.add(node.getIdentifier());
+ }
+
+ final Collection<ValidationResult> validationResults = getValidationErrors(ids);
+ for ( final ValidationResult result : validationResults ) {
+ if ( !result.isValid() ) {
+ throw new IllegalStateException(this + " cannot be started because it is not valid: " + result);
+ }
+ }
+ }
@Override
public void verifyCanStop() {