You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2015/04/22 23:13:22 UTC
[32/49] incubator-nifi git commit: NIFI-271 checkpoint
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index d8f1338..a45bf76 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -61,9 +61,6 @@ import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- *
- */
public class StandardControllerServiceProvider implements ControllerServiceProvider {
private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
@@ -112,24 +109,24 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
populateInterfaces(superClass, interfacesDefinedThusFar);
}
}
-
+
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
if (type == null || id == null) {
throw new NullPointerException();
}
-
+
final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
final ClassLoader cl = ExtensionManager.getClassLoader(type);
final Class<?> rawClass;
- if ( cl == null ) {
+ if (cl == null) {
rawClass = Class.forName(type);
} else {
Thread.currentThread().setContextClassLoader(cl);
rawClass = Class.forName(type, false, cl);
}
-
+
final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
final ControllerService originalService = controllerServiceClass.newInstance();
@@ -138,11 +135,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
@Override
public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
- final String methodName = method.getName();
- if("initialize".equals(methodName) || "onPropertyModified".equals(methodName)){
- throw new UnsupportedOperationException(method + " may only be invoked by the NiFi framework");
- }
-
+ final String methodName = method.getName();
+ if ("initialize".equals(methodName) || "onPropertyModified".equals(methodName)) {
+ throw new UnsupportedOperationException(method + " may only be invoked by the NiFi framework");
+ }
+
final ControllerServiceNode node = serviceNodeHolder.get();
final ControllerServiceState state = node.getState();
final boolean disabled = (state != ControllerServiceState.ENABLED); // only allow method call if service state is ENABLED.
@@ -166,7 +163,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
};
final ControllerService proxiedService;
- if ( cl == null ) {
+ if (cl == null) {
proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), getInterfaces(controllerServiceClass), invocationHandler);
} else {
proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
@@ -181,8 +178,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this);
serviceNodeHolder.set(serviceNode);
serviceNode.setName(rawClass.getSimpleName());
-
- if ( firstTimeAdded ) {
+
+ if (firstTimeAdded) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService);
} catch (final Exception e) {
@@ -200,226 +197,227 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
}
-
-
-
+
@Override
public void disableReferencingServices(final ControllerServiceNode serviceNode) {
// Get a list of all Controller Services that need to be disabled, in the order that they need to be
// disabled.
final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
-
- for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+
+ for (final ControllerServiceNode nodeToDisable : toDisable) {
final ControllerServiceState state = nodeToDisable.getState();
-
- if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+
+ if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
nodeToDisable.verifyCanDisable(serviceSet);
}
}
-
+
Collections.reverse(toDisable);
- for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+ for (final ControllerServiceNode nodeToDisable : toDisable) {
final ControllerServiceState state = nodeToDisable.getState();
-
- if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+
+ if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
disableControllerService(nodeToDisable);
}
}
}
-
-
+
@Override
public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
// find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
// or a service that references this controller service, etc.
final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
-
+
// verify that we can start all components (that are not disabled) before doing anything
- for ( final ProcessorNode node : processors ) {
- if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+ for (final ProcessorNode node : processors) {
+ if (node.getScheduledState() != ScheduledState.DISABLED) {
node.verifyCanStart();
}
}
- for ( final ReportingTaskNode node : reportingTasks ) {
- if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+ for (final ReportingTaskNode node : reportingTasks) {
+ if (node.getScheduledState() != ScheduledState.DISABLED) {
node.verifyCanStart();
}
}
-
+
// start all of the components that are not disabled
- for ( final ProcessorNode node : processors ) {
- if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+ for (final ProcessorNode node : processors) {
+ if (node.getScheduledState() != ScheduledState.DISABLED) {
node.getProcessGroup().startProcessor(node);
}
}
- for ( final ReportingTaskNode node : reportingTasks ) {
- if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+ for (final ReportingTaskNode node : reportingTasks) {
+ if (node.getScheduledState() != ScheduledState.DISABLED) {
processScheduler.schedule(node);
}
}
}
-
+
@Override
public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
// find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
// or a service that references this controller service, etc.
final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
-
+
// verify that we can stop all components (that are running) before doing anything
- for ( final ProcessorNode node : processors ) {
- if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+ for (final ProcessorNode node : processors) {
+ if (node.getScheduledState() == ScheduledState.RUNNING) {
node.verifyCanStop();
}
}
- for ( final ReportingTaskNode node : reportingTasks ) {
- if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+ for (final ReportingTaskNode node : reportingTasks) {
+ if (node.getScheduledState() == ScheduledState.RUNNING) {
node.verifyCanStop();
}
}
-
+
// stop all of the components that are running
- for ( final ProcessorNode node : processors ) {
- if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+ for (final ProcessorNode node : processors) {
+ if (node.getScheduledState() == ScheduledState.RUNNING) {
node.getProcessGroup().stopProcessor(node);
}
}
- for ( final ReportingTaskNode node : reportingTasks ) {
- if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+ for (final ReportingTaskNode node : reportingTasks) {
+ if (node.getScheduledState() == ScheduledState.RUNNING) {
processScheduler.unschedule(node);
}
}
}
-
+
@Override
public void enableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanEnable();
processScheduler.enableControllerService(serviceNode);
}
-
+
@Override
public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
final Set<ControllerServiceNode> servicesToEnable = new HashSet<>();
// Ensure that all nodes are already disabled
- for ( final ControllerServiceNode serviceNode : serviceNodes ) {
+ for (final ControllerServiceNode serviceNode : serviceNodes) {
final ControllerServiceState curState = serviceNode.getState();
- if ( ControllerServiceState.DISABLED.equals(curState) ) {
+ if (ControllerServiceState.DISABLED.equals(curState)) {
servicesToEnable.add(serviceNode);
} else {
logger.warn("Cannot enable {} because it is not disabled; current state is {}", serviceNode, curState);
}
}
-
+
// determine the order to load the services. We have to ensure that if service A references service B, then B
// is enabled first, and so on.
final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>();
- for ( final ControllerServiceNode node : servicesToEnable ) {
+ for (final ControllerServiceNode node : servicesToEnable) {
idToNodeMap.put(node.getIdentifier(), node);
}
-
+
// We can have many Controller Services dependent on one another. We can have many of these
// disparate lists of Controller Services that are dependent on one another. We refer to each
// of these as a branch.
final List<List<ControllerServiceNode>> branches = determineEnablingOrder(idToNodeMap);
- if ( branches.isEmpty() ) {
+ if (branches.isEmpty()) {
logger.info("No Controller Services to enable");
return;
} else {
logger.info("Will enable {} Controller Services", servicesToEnable.size());
}
-
+
// Mark all services that are configured to be enabled as 'ENABLING'. This allows Processors, reporting tasks
// to be valid so that they can be scheduled.
- for ( final List<ControllerServiceNode> branch : branches ) {
- for ( final ControllerServiceNode nodeToEnable : branch ) {
+ for (final List<ControllerServiceNode> branch : branches) {
+ for (final ControllerServiceNode nodeToEnable : branch) {
nodeToEnable.setState(ControllerServiceState.ENABLING);
}
}
-
+
final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()));
- for ( final List<ControllerServiceNode> branch : branches ) {
+ for (final List<ControllerServiceNode> branch : branches) {
final Runnable enableBranchRunnable = new Runnable() {
@Override
public void run() {
logger.debug("Enabling Controller Service Branch {}", branch);
-
- for ( final ControllerServiceNode serviceNode : branch ) {
+
+ for (final ControllerServiceNode serviceNode : branch) {
try {
- if ( !enabledNodes.contains(serviceNode) ) {
+ if (!enabledNodes.contains(serviceNode)) {
enabledNodes.add(serviceNode);
-
+
logger.info("Enabling {}", serviceNode);
try {
processScheduler.enableControllerService(serviceNode);
} catch (final Exception e) {
logger.error("Failed to enable " + serviceNode + " due to " + e);
- if ( logger.isDebugEnabled() ) {
+ if (logger.isDebugEnabled()) {
logger.error("", e);
}
-
- if ( bulletinRepo != null ) {
+
+ if (bulletinRepo != null) {
bulletinRepo.addBulletin(BulletinFactory.createBulletin(
- "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
+ "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
}
}
}
-
+
// wait for service to finish enabling.
- while ( ControllerServiceState.ENABLING.equals(serviceNode.getState()) ) {
+ while (ControllerServiceState.ENABLING.equals(serviceNode.getState())) {
try {
Thread.sleep(100L);
- } catch (final InterruptedException ie) {}
+ } catch (final InterruptedException ie) {
+ }
}
-
+
logger.info("State for {} is now {}", serviceNode, serviceNode.getState());
} catch (final Exception e) {
logger.error("Failed to enable {} due to {}", serviceNode, e.toString());
- if ( logger.isDebugEnabled() ) {
+ if (logger.isDebugEnabled()) {
logger.error("", e);
}
}
}
}
};
-
+
executor.submit(enableBranchRunnable);
}
-
+
executor.shutdown();
}
-
+
static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>();
-
- for ( final ControllerServiceNode node : serviceNodeMap.values() ) {
- if ( orderedNodeLists.contains(node) ) {
+
+ for (final ControllerServiceNode node : serviceNodeMap.values()) {
+ if (orderedNodeLists.contains(node)) {
continue; // this node is already in the list.
}
-
+
final List<ControllerServiceNode> branch = new ArrayList<>();
determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>());
orderedNodeLists.add(branch);
}
-
+
return orderedNodeLists;
}
-
-
- private static void determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode, final List<ControllerServiceNode> orderedNodes, final Set<ControllerServiceNode> visited) {
- if ( visited.contains(contextNode) ) {
+
+ private static void determineEnablingOrder(
+ final Map<String, ControllerServiceNode> serviceNodeMap,
+ final ControllerServiceNode contextNode,
+ final List<ControllerServiceNode> orderedNodes,
+ final Set<ControllerServiceNode> visited) {
+ if (visited.contains(contextNode)) {
return;
}
-
- for ( final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet() ) {
- if ( entry.getKey().getControllerServiceDefinition() != null ) {
+
+ for (final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet()) {
+ if (entry.getKey().getControllerServiceDefinition() != null) {
final String referencedServiceId = entry.getValue();
- if ( referencedServiceId != null ) {
+ if (referencedServiceId != null) {
final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId);
- if ( !orderedNodes.contains(referencedNode) ) {
+ if (!orderedNodes.contains(referencedNode)) {
visited.add(contextNode);
determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited);
}
@@ -427,12 +425,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
}
}
- if ( !orderedNodes.contains(contextNode) ) {
+ if (!orderedNodes.contains(contextNode)) {
orderedNodes.add(contextNode);
}
}
-
-
+
@Override
public void disableControllerService(final ControllerServiceNode serviceNode) {
serviceNode.verifyCanDisable();
@@ -461,7 +458,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
return (node == null) ? false : (ControllerServiceState.ENABLING == node.getState());
}
-
+
@Override
public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
return controllerServices.get(serviceIdentifier);
@@ -478,157 +475,158 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
return identifiers;
}
-
+
@Override
public String getControllerServiceName(final String serviceIdentifier) {
- final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
- return node == null ? null : node.getName();
+ final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
+ return node == null ? null : node.getName();
}
-
+
+ @Override
public void removeControllerService(final ControllerServiceNode serviceNode) {
final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier());
- if ( existing == null || existing != serviceNode ) {
+ if (existing == null || existing != serviceNode) {
throw new IllegalStateException("Controller Service " + serviceNode + " does not exist in this Flow");
}
-
+
serviceNode.verifyCanDelete();
-
+
try (final NarCloseable x = NarCloseable.withNarLoader()) {
final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext);
}
-
- for ( final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet() ) {
+
+ for (final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
- if (descriptor.getControllerServiceDefinition() != null ) {
+ if (descriptor.getControllerServiceDefinition() != null) {
final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
- if ( value != null ) {
+ if (value != null) {
final ControllerServiceNode referencedNode = getControllerServiceNode(value);
- if ( referencedNode != null ) {
+ if (referencedNode != null) {
referencedNode.removeReference(serviceNode);
}
}
}
}
-
+
controllerServices.remove(serviceNode.getIdentifier());
}
-
+
@Override
public Set<ControllerServiceNode> getAllControllerServices() {
- return new HashSet<>(controllerServices.values());
+ return new HashSet<>(controllerServices.values());
}
-
-
+
/**
- * Returns a List of all components that reference the given referencedNode (either directly or indirectly through
- * another service) that are also of the given componentType. The list that is returned is in the order in which they will
- * need to be 'activated' (enabled/started).
- * @param referencedNode
- * @param componentType
- * @return
+ * Returns a List of all components that reference the given referencedNode
+ * (either directly or indirectly through another service) that are also of
+ * the given componentType. The list that is returned is in the order in
+ * which they will need to be 'activated' (enabled/started).
+ *
+ * @param referencedNode node
+ * @param componentType type
+ * @return list of components
*/
private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) {
final List<T> references = new ArrayList<>();
-
- for ( final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents() ) {
- if ( componentType.isAssignableFrom(referencingComponent.getClass()) ) {
+
+ for (final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents()) {
+ if (componentType.isAssignableFrom(referencingComponent.getClass())) {
references.add(componentType.cast(referencingComponent));
}
-
- if ( referencingComponent instanceof ControllerServiceNode ) {
+
+ if (referencingComponent instanceof ControllerServiceNode) {
final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent;
-
+
// find components recursively that depend on referencingNode.
final List<T> recursive = findRecursiveReferences(referencingNode, componentType);
-
+
// For anything that depends on referencing node, we want to add it to the list, but we know
// that it must come after the referencing node, so we first remove any existing occurrence.
references.removeAll(recursive);
references.addAll(recursive);
}
}
-
+
return references;
}
-
@Override
public void enableReferencingServices(final ControllerServiceNode serviceNode) {
final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
enableReferencingServices(serviceNode, recursiveReferences);
}
-
+
private void enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) {
- if ( serviceNode.getState() != ControllerServiceState.ENABLED && serviceNode.getState() != ControllerServiceState.ENABLING ) {
+ if (serviceNode.getState() != ControllerServiceState.ENABLED && serviceNode.getState() != ControllerServiceState.ENABLING) {
serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences));
}
-
+
final Set<ControllerServiceNode> ifEnabled = new HashSet<>();
final List<ControllerServiceNode> toEnable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
- for ( final ControllerServiceNode nodeToEnable : toEnable ) {
+ for (final ControllerServiceNode nodeToEnable : toEnable) {
final ControllerServiceState state = nodeToEnable.getState();
- if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+ if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
nodeToEnable.verifyCanEnable(ifEnabled);
ifEnabled.add(nodeToEnable);
}
}
-
- for ( final ControllerServiceNode nodeToEnable : toEnable ) {
+
+ for (final ControllerServiceNode nodeToEnable : toEnable) {
final ControllerServiceState state = nodeToEnable.getState();
- if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+ if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
enableControllerService(nodeToEnable);
}
}
}
-
+
@Override
public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
-
- for ( final ControllerServiceNode referencingService : referencingServices ) {
+
+ for (final ControllerServiceNode referencingService : referencingServices) {
referencingService.verifyCanEnable(referencingServiceSet);
}
}
-
+
@Override
public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
final List<ReportingTaskNode> referencingReportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
final List<ProcessorNode> referencingProcessors = findRecursiveReferences(serviceNode, ProcessorNode.class);
-
+
final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
-
- for ( final ReportingTaskNode taskNode : referencingReportingTasks ) {
- if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
+
+ for (final ReportingTaskNode taskNode : referencingReportingTasks) {
+ if (taskNode.getScheduledState() != ScheduledState.DISABLED) {
taskNode.verifyCanStart(referencingServiceSet);
}
}
-
- for ( final ProcessorNode procNode : referencingProcessors ) {
- if ( procNode.getScheduledState() != ScheduledState.DISABLED ) {
+
+ for (final ProcessorNode procNode : referencingProcessors) {
+ if (procNode.getScheduledState() != ScheduledState.DISABLED) {
procNode.verifyCanStart(referencingServiceSet);
}
}
}
-
+
@Override
public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
// Get a list of all Controller Services that need to be disabled, in the order that they need to be
// disabled.
final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
-
- for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+
+ for (final ControllerServiceNode nodeToDisable : toDisable) {
final ControllerServiceState state = nodeToDisable.getState();
-
- if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+
+ if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
nodeToDisable.verifyCanDisable(serviceSet);
}
}
}
-
+
@Override
public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
// we can always stop referencing components
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
index c470b99..701adcf 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
@@ -65,9 +65,9 @@ public class StandardControllerServiceReference implements ControllerServiceRefe
for (final ConfiguredComponent component : components) {
if (component instanceof ControllerServiceNode) {
serviceNodes.add((ControllerServiceNode) component);
-
+
final ControllerServiceState state = ((ControllerServiceNode) component).getState();
- if ( state != ControllerServiceState.DISABLED ) {
+ if (state != ControllerServiceState.DISABLED) {
activeReferences.add(component);
}
} else if (isRunning(component)) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
index 89ac846..6970fce 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
@@ -31,7 +31,8 @@ public class StandardMetricDescriptor<T> implements MetricDescriptor<T> {
this(field, label, description, formatter, valueFunction, null);
}
- public StandardMetricDescriptor(final String field, final String label, final String description, final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) {
+ public StandardMetricDescriptor(final String field, final String label, final String description,
+ final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) {
this.field = field;
this.label = label;
this.description = description;
@@ -40,41 +41,21 @@ public class StandardMetricDescriptor<T> implements MetricDescriptor<T> {
this.reducer = reducer == null ? new SumReducer() : reducer;
}
- /**
- * The name of this status field.
- *
- * @return
- */
@Override
public String getField() {
return field;
}
- /**
- * The label of this status field.
- *
- * @return
- */
@Override
public String getLabel() {
return label;
}
- /**
- * The description of this status field.
- *
- * @return
- */
@Override
public String getDescription() {
return description;
}
- /**
- * The formatter for this descriptor.
- *
- * @return
- */
@Override
public MetricDescriptor.Formatter getFormatter() {
return formatter;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
index 0872192..d2a983a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
@@ -91,7 +91,8 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
@Override
public synchronized void capture(final ProcessGroupStatus rootGroupStatus, final Date timestamp) {
- captures.add(new Capture(timestamp, ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, ComponentType.PROCESSOR, ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, ComponentType.REMOTE_PROCESS_GROUP)));
+ captures.add(new Capture(timestamp, ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, ComponentType.PROCESSOR,
+ ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, ComponentType.REMOTE_PROCESS_GROUP)));
logger.debug("Captured metrics for {}", this);
lastCaptureTime = Math.max(lastCaptureTime, timestamp.getTime());
}
@@ -269,48 +270,57 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
public static enum RemoteProcessGroupStatusDescriptor {
- SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)", "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return status.getSentContentSize();
- }
- })),
- SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)", "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return Long.valueOf(status.getSentCount().longValue());
- }
- })),
- RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)", "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return status.getReceivedContentSize();
- }
- })),
- RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)", "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return Long.valueOf(status.getReceivedCount().longValue());
- }
- })),
- RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return Long.valueOf(status.getReceivedContentSize().longValue() / 300L);
- }
- })),
- SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return Long.valueOf(status.getSentContentSize().longValue() / 300L);
- }
- })),
- TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second", "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
- @Override
- public Long getValue(final RemoteProcessGroupStatus status) {
- return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L);
- }
- })),
+ SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)",
+ "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return status.getSentContentSize();
+ }
+ })),
+ SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)",
+ "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return Long.valueOf(status.getSentCount().longValue());
+ }
+ })),
+ RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)",
+ "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return status.getReceivedContentSize();
+ }
+ })),
+ RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)",
+ "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return Long.valueOf(status.getReceivedCount().longValue());
+ }
+ })),
+ RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second",
+ "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
+ Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return Long.valueOf(status.getReceivedContentSize().longValue() / 300L);
+ }
+ })),
+ SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second",
+ "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return Long.valueOf(status.getSentContentSize().longValue() / 300L);
+ }
+ })),
+ TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second",
+ "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second",
+ Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L);
+ }
+ })),
AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>(
"averageLineageDuration",
"Average Lineage Duration (5 mins)",
@@ -358,66 +368,83 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
public static enum ProcessGroupStatusDescriptor {
- BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)", "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getBytesRead();
- }
- })),
- BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)", "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getBytesWritten();
- }
- })),
- BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)", "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getBytesRead() + status.getBytesWritten();
- }
- })),
- INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getInputContentSize();
- }
- })),
- INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getInputCount().longValue();
- }
- })),
- OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getOutputContentSize();
- }
- })),
- OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getOutputCount().longValue();
- }
- })),
- QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes", "The cumulative size of all FlowFiles queued in all Connections of this Process Group", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getQueuedContentSize();
- }
- })),
- QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count", "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return status.getQueuedCount().longValue();
- }
- })),
- TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)", "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes", Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() {
- @Override
- public Long getValue(final ProcessGroupStatus status) {
- return calculateTaskMillis(status);
- }
- }));
+ BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)",
+ "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getBytesRead();
+ }
+ })),
+ BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)",
+ "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getBytesWritten();
+ }
+ })),
+ BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)",
+ "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes",
+ Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getBytesRead() + status.getBytesWritten();
+ }
+ })),
+ INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)",
+ "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
+ Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getInputContentSize();
+ }
+ })),
+ INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)",
+ "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
+ Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getInputCount().longValue();
+ }
+ })),
+ OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)",
+ "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
+ Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getOutputContentSize();
+ }
+ })),
+ OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)",
+ "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
+ Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getOutputCount().longValue();
+ }
+ })),
+ QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes",
+ "The cumulative size of all FlowFiles queued in all Connections of this Process Group",
+ Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getQueuedContentSize();
+ }
+ })),
+ QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count",
+ "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getQueuedCount().longValue();
+ }
+ })),
+ TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)",
+ "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes",
+ Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return calculateTaskMillis(status);
+ }
+ }));
private MetricDescriptor<ProcessGroupStatus> descriptor;
@@ -436,42 +463,48 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
public static enum ConnectionStatusDescriptor {
- INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
- @Override
- public Long getValue(final ConnectionStatus status) {
- return status.getInputBytes();
- }
- })),
- INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
- @Override
- public Long getValue(final ConnectionStatus status) {
- return Long.valueOf(status.getInputCount());
- }
- })),
- OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
- @Override
- public Long getValue(final ConnectionStatus status) {
- return status.getOutputBytes();
- }
- })),
- OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
- @Override
- public Long getValue(final ConnectionStatus status) {
- return Long.valueOf(status.getOutputCount());
- }
- })),
- QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes", "The number of Bytes queued in this Connection", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
- @Override
- public Long getValue(final ConnectionStatus status) {
- return status.getQueuedBytes();
- }
- })),
- QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count", "The number of FlowFiles queued in this Connection", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
- @Override
- public Long getValue(final ConnectionStatus status) {
- return Long.valueOf(status.getQueuedCount());
- }
- }));
+ INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)",
+ "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
+ @Override
+ public Long getValue(final ConnectionStatus status) {
+ return status.getInputBytes();
+ }
+ })),
+ INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 mins)",
+ "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
+ @Override
+ public Long getValue(final ConnectionStatus status) {
+ return Long.valueOf(status.getInputCount());
+ }
+ })),
+ OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)",
+ "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
+ @Override
+ public Long getValue(final ConnectionStatus status) {
+ return status.getOutputBytes();
+ }
+ })),
+ OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 mins)",
+ "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
+ @Override
+ public Long getValue(final ConnectionStatus status) {
+ return Long.valueOf(status.getOutputCount());
+ }
+ })),
+ QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes",
+ "The number of Bytes queued in this Connection", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
+ @Override
+ public Long getValue(final ConnectionStatus status) {
+ return status.getQueuedBytes();
+ }
+ })),
+ QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count",
+ "The number of FlowFiles queued in this Connection", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
+ @Override
+ public Long getValue(final ConnectionStatus status) {
+ return Long.valueOf(status.getQueuedCount());
+ }
+ }));
private MetricDescriptor<ConnectionStatus> descriptor;
@@ -490,66 +523,76 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
public static enum ProcessorStatusDescriptor {
- BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", "Bytes Read (5 mins)", "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return status.getBytesRead();
- }
- })),
- BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 mins)", "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return status.getBytesWritten();
- }
- })),
- BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes Transferred (5 mins)", "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return status.getBytesRead() + status.getBytesWritten();
- }
- })),
- INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return status.getInputBytes();
- }
- })),
- INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return Long.valueOf(status.getInputCount());
- }
- })),
- OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return status.getOutputBytes();
- }
- })),
- OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return Long.valueOf(status.getOutputCount());
- }
- })),
- TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return Long.valueOf(status.getInvocations());
- }
- })),
- TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 mins)", "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", Formatter.DURATION, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS);
- }
- })),
- FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles Removed (5 mins)", "The total number of FlowFiles removed by this Processor in the last 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
- @Override
- public Long getValue(final ProcessorStatus status) {
- return Long.valueOf(status.getFlowFilesRemoved());
- }
- })),
+ BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", "Bytes Read (5 mins)",
+ "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return status.getBytesRead();
+ }
+ })),
+ BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 mins)",
+ "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return status.getBytesWritten();
+ }
+ })),
+ BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes Transferred (5 mins)",
+ "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return status.getBytesRead() + status.getBytesWritten();
+ }
+ })),
+ INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)",
+ "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return status.getInputBytes();
+ }
+ })),
+ INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 mins)",
+ "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return Long.valueOf(status.getInputCount());
+ }
+ })),
+ OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)",
+ "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return status.getOutputBytes();
+ }
+ })),
+ OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 mins)",
+ "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return Long.valueOf(status.getOutputCount());
+ }
+ })),
+ TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes",
+ Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return Long.valueOf(status.getInvocations());
+ }
+ })),
+ TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 mins)",
+ "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", Formatter.DURATION, new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS);
+ }
+ })),
+ FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles Removed (5 mins)",
+ "The total number of FlowFiles removed by this Processor in the last 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return Long.valueOf(status.getFlowFilesRemoved());
+ }
+ })),
AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>(
"averageLineageDuration",
"Average Lineage Duration (5 mins)",
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
index 5ecd22e..f3cbb90 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
@@ -35,8 +35,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Continually runs a Connectable as long as the processor has work to do. {@link #call()} will return
- * <code>true</code> if the Connectable should be yielded, <code>false</code> otherwise.
+ * Continually runs a Connectable as long as the processor has work to do.
+ * {@link #call()} will return <code>true</code> if the Connectable should be
+ * yielded, <code>false</code> otherwise.
*/
public class ContinuallyRunConnectableTask implements Callable<Boolean> {
@@ -60,7 +61,7 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
if (!scheduleState.isScheduled()) {
return false;
}
-
+
// Connectable should run if the following conditions are met:
// 1. It is not yielded.
// 2. It has incoming connections with FlowFiles queued or doesn't expect incoming connections
@@ -106,7 +107,7 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
// yield for just a bit.
return true;
}
-
- return false; // do not yield
+
+ return false; // do not yield
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index cff8744..baed6ae 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -43,10 +43,10 @@ import org.apache.nifi.util.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
- * Continually runs a processor as long as the processor has work to do. {@link #call()} will return
- * <code>true</code> if the processor should be yielded, <code>false</code> otherwise.
+ * Continually runs a processor as long as the processor has work to do.
+ * {@link #call()} will return <code>true</code> if the processor should be
+ * yielded, <code>false</code> otherwise.
*/
public class ContinuallyRunProcessorTask implements Callable<Boolean> {
@@ -61,7 +61,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
private final int numRelationships;
public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, final ProcessorNode procNode,
- final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState,
+ final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState,
final StandardProcessContext processContext) {
this.schedulingAgent = schedulingAgent;
@@ -163,9 +163,9 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
if (batch) {
rawSession.commit();
}
-
+
final long processingNanos = System.nanoTime() - startNanos;
-
+
// if the processor is no longer scheduled to run and this is the last thread,
// invoke the OnStopped methods
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
@@ -174,7 +174,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
flowController.heartbeat();
}
}
-
+
try {
final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier());
procEvent.setProcessingNanos(processingNanos);
@@ -188,7 +188,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
scheduleState.decrementActiveThreadCount();
}
}
-
+
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
index 0c472c8..5724bb4 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
@@ -42,7 +42,7 @@ public class ReportingTaskWrapper implements Runnable {
taskNode.getReportingTask().onTrigger(taskNode.getReportingContext());
} catch (final Throwable t) {
final ComponentLog componentLog = new SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask());
- componentLog.error("Error running task {} due to {}", new Object[] {taskNode.getReportingTask(), t.toString()});
+ componentLog.error("Error running task {} due to {}", new Object[]{taskNode.getReportingTask(), t.toString()});
if (componentLog.isDebugEnabled()) {
componentLog.error("", t);
}
@@ -52,7 +52,9 @@ public class ReportingTaskWrapper implements Runnable {
// invoke the OnStopped methods
if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
try (final NarCloseable x = NarCloseable.withNarLoader()) {
- ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, taskNode.getReportingTask(), taskNode.getConfigurationContext());
+ ReflectionUtils.quietlyInvokeMethodsWithAnnotation(
+ OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class,
+ taskNode.getReportingTask(), taskNode.getConfigurationContext());
}
}
} finally {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
index be79c5b..fccd10e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
@@ -76,7 +76,7 @@ public final class StringEncryptor {
* Creates an instance of the nifi sensitive property encryptor. Validates
* that the encryptor is actually working.
*
- * @return
+ * @return encryptor
* @throws EncryptionException if any issues arise initializing or
* validating the encryptor
*/
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
index 76e8e3e..3be178f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
@@ -27,9 +27,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
-/**
- * @author unattributed
- */
public final class FlowEngine extends ScheduledThreadPoolExecutor {
private static final Logger logger = LoggerFactory.getLogger(FlowEngine.class);
@@ -39,19 +36,20 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
*
* @param corePoolSize the maximum number of threads available to tasks
* running in the engine.
- * @param threadNamePrefix
+ * @param threadNamePrefix for naming the thread
*/
public FlowEngine(int corePoolSize, final String threadNamePrefix) {
- this(corePoolSize, threadNamePrefix, false);
+ this(corePoolSize, threadNamePrefix, false);
}
-
+
/**
* Creates a new instance of FlowEngine
*
* @param corePoolSize the maximum number of threads available to tasks
* running in the engine.
- * @param threadNamePrefix
- * @param deamon if true, the thread pool will be populated with daemon threads, otherwise the threads will not be marked as daemon.
+ * @param threadNamePrefix for thread naming
+ * @param daemon if true, the thread pool will be populated with daemon
+ * threads, otherwise the threads will not be marked as daemon.
*/
public FlowEngine(int corePoolSize, final String threadNamePrefix, final boolean daemon) {
super(corePoolSize);
@@ -62,8 +60,8 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
@Override
public Thread newThread(final Runnable r) {
final Thread t = defaultThreadFactory.newThread(r);
- if ( daemon ) {
- t.setDaemon(true);
+ if (daemon) {
+ t.setDaemon(true);
}
t.setName(threadNamePrefix + " Thread-" + threadIndex.incrementAndGet());
return t;
@@ -75,8 +73,8 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
* Hook method called by the running thread whenever a runnable task is
* given to the thread to run.
*
- * @param thread
- * @param runnable
+ * @param thread thread
+ * @param runnable runnable
*/
@Override
protected void beforeExecute(final Thread thread, final Runnable runnable) {
@@ -90,8 +88,8 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
* execution of the runnable completed. Logs the fact of completion and any
* errors that might have occured.
*
- * @param runnable
- * @param throwable
+ * @param runnable runnable
+ * @param throwable throwable
*/
@Override
protected void afterExecute(final Runnable runnable, final Throwable throwable) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index 044541b..e8708bd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -180,7 +180,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
* bulletin strategy is employed, bulletins will not be persisted in this
* repository and will sent to the specified strategy instead.
*
- * @param strategy
+ * @param strategy bulletin strategy
*/
public void overrideDefaultBulletinProcessing(final BulletinProcessingStrategy strategy) {
Objects.requireNonNull(strategy);