You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/22 17:46:46 UTC

[03/11] incubator-nifi git commit: NIFI-271 checkpoint

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
index d8f1338..a45bf76 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceProvider.java
@@ -61,9 +61,6 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-/**
- *
- */
 public class StandardControllerServiceProvider implements ControllerServiceProvider {
 
     private static final Logger logger = LoggerFactory.getLogger(StandardControllerServiceProvider.class);
@@ -112,24 +109,24 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             populateInterfaces(superClass, interfacesDefinedThusFar);
         }
     }
-    
+
     @Override
     public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
         if (type == null || id == null) {
             throw new NullPointerException();
         }
-        
+
         final ClassLoader currentContextClassLoader = Thread.currentThread().getContextClassLoader();
         try {
             final ClassLoader cl = ExtensionManager.getClassLoader(type);
             final Class<?> rawClass;
-            if ( cl == null ) {
+            if (cl == null) {
                 rawClass = Class.forName(type);
             } else {
                 Thread.currentThread().setContextClassLoader(cl);
                 rawClass = Class.forName(type, false, cl);
             }
-            
+
             final Class<? extends ControllerService> controllerServiceClass = rawClass.asSubclass(ControllerService.class);
 
             final ControllerService originalService = controllerServiceClass.newInstance();
@@ -138,11 +135,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
                 @Override
                 public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable {
 
-                	final String methodName = method.getName();
-                	if("initialize".equals(methodName) || "onPropertyModified".equals(methodName)){
-                		throw new UnsupportedOperationException(method + " may only be invoked by the NiFi framework");
-                	}
-                	
+                    final String methodName = method.getName();
+                    if ("initialize".equals(methodName) || "onPropertyModified".equals(methodName)) {
+                        throw new UnsupportedOperationException(method + " may only be invoked by the NiFi framework");
+                    }
+
                     final ControllerServiceNode node = serviceNodeHolder.get();
                     final ControllerServiceState state = node.getState();
                     final boolean disabled = (state != ControllerServiceState.ENABLED); // only allow method call if service state is ENABLED.
@@ -166,7 +163,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             };
 
             final ControllerService proxiedService;
-            if ( cl == null ) {
+            if (cl == null) {
                 proxiedService = (ControllerService) Proxy.newProxyInstance(getClass().getClassLoader(), getInterfaces(controllerServiceClass), invocationHandler);
             } else {
                 proxiedService = (ControllerService) Proxy.newProxyInstance(cl, getInterfaces(controllerServiceClass), invocationHandler);
@@ -181,8 +178,8 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             final ControllerServiceNode serviceNode = new StandardControllerServiceNode(proxiedService, originalService, id, validationContextFactory, this);
             serviceNodeHolder.set(serviceNode);
             serviceNode.setName(rawClass.getSimpleName());
-            
-            if ( firstTimeAdded ) {
+
+            if (firstTimeAdded) {
                 try (final NarCloseable x = NarCloseable.withNarLoader()) {
                     ReflectionUtils.invokeMethodsWithAnnotation(OnAdded.class, originalService);
                 } catch (final Exception e) {
@@ -200,226 +197,227 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             }
         }
     }
-    
-    
-    
+
     @Override
     public void disableReferencingServices(final ControllerServiceNode serviceNode) {
         // Get a list of all Controller Services that need to be disabled, in the order that they need to be
         // disabled.
         final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
         final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
-        
-        for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+
+        for (final ControllerServiceNode nodeToDisable : toDisable) {
             final ControllerServiceState state = nodeToDisable.getState();
-            
-            if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+
+            if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
                 nodeToDisable.verifyCanDisable(serviceSet);
             }
         }
-        
+
         Collections.reverse(toDisable);
-        for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+        for (final ControllerServiceNode nodeToDisable : toDisable) {
             final ControllerServiceState state = nodeToDisable.getState();
-            
-            if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+
+            if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
                 disableControllerService(nodeToDisable);
             }
         }
     }
-    
-    
+
     @Override
     public void scheduleReferencingComponents(final ControllerServiceNode serviceNode) {
         // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
         // or a service that references this controller service, etc.
         final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
         final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
-        
+
         // verify that  we can start all components (that are not disabled) before doing anything
-        for ( final ProcessorNode node : processors ) {
-            if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+        for (final ProcessorNode node : processors) {
+            if (node.getScheduledState() != ScheduledState.DISABLED) {
                 node.verifyCanStart();
             }
         }
-        for ( final ReportingTaskNode node : reportingTasks ) {
-            if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+        for (final ReportingTaskNode node : reportingTasks) {
+            if (node.getScheduledState() != ScheduledState.DISABLED) {
                 node.verifyCanStart();
             }
         }
-        
+
         // start all of the components that are not disabled
-        for ( final ProcessorNode node : processors ) {
-            if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+        for (final ProcessorNode node : processors) {
+            if (node.getScheduledState() != ScheduledState.DISABLED) {
                 node.getProcessGroup().startProcessor(node);
             }
         }
-        for ( final ReportingTaskNode node : reportingTasks ) {
-            if ( node.getScheduledState() != ScheduledState.DISABLED ) {
+        for (final ReportingTaskNode node : reportingTasks) {
+            if (node.getScheduledState() != ScheduledState.DISABLED) {
                 processScheduler.schedule(node);
             }
         }
     }
-    
+
     @Override
     public void unscheduleReferencingComponents(final ControllerServiceNode serviceNode) {
         // find all of the schedulable components (processors, reporting tasks) that refer to this Controller Service,
         // or a service that references this controller service, etc.
         final List<ProcessorNode> processors = findRecursiveReferences(serviceNode, ProcessorNode.class);
         final List<ReportingTaskNode> reportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
-        
+
         // verify that  we can stop all components (that are running) before doing anything
-        for ( final ProcessorNode node : processors ) {
-            if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+        for (final ProcessorNode node : processors) {
+            if (node.getScheduledState() == ScheduledState.RUNNING) {
                 node.verifyCanStop();
             }
         }
-        for ( final ReportingTaskNode node : reportingTasks ) {
-            if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+        for (final ReportingTaskNode node : reportingTasks) {
+            if (node.getScheduledState() == ScheduledState.RUNNING) {
                 node.verifyCanStop();
             }
         }
-        
+
         // stop all of the components that are running
-        for ( final ProcessorNode node : processors ) {
-            if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+        for (final ProcessorNode node : processors) {
+            if (node.getScheduledState() == ScheduledState.RUNNING) {
                 node.getProcessGroup().stopProcessor(node);
             }
         }
-        for ( final ReportingTaskNode node : reportingTasks ) {
-            if ( node.getScheduledState() == ScheduledState.RUNNING ) {
+        for (final ReportingTaskNode node : reportingTasks) {
+            if (node.getScheduledState() == ScheduledState.RUNNING) {
                 processScheduler.unschedule(node);
             }
         }
     }
-    
+
     @Override
     public void enableControllerService(final ControllerServiceNode serviceNode) {
         serviceNode.verifyCanEnable();
         processScheduler.enableControllerService(serviceNode);
     }
-    
+
     @Override
     public void enableControllerServices(final Collection<ControllerServiceNode> serviceNodes) {
         final Set<ControllerServiceNode> servicesToEnable = new HashSet<>();
         // Ensure that all nodes are already disabled
-        for ( final ControllerServiceNode serviceNode : serviceNodes ) {
+        for (final ControllerServiceNode serviceNode : serviceNodes) {
             final ControllerServiceState curState = serviceNode.getState();
-            if ( ControllerServiceState.DISABLED.equals(curState) ) {
+            if (ControllerServiceState.DISABLED.equals(curState)) {
                 servicesToEnable.add(serviceNode);
             } else {
                 logger.warn("Cannot enable {} because it is not disabled; current state is {}", serviceNode, curState);
             }
         }
-        
+
         // determine the order to load the services. We have to ensure that if service A references service B, then B
         // is enabled first, and so on.
         final Map<String, ControllerServiceNode> idToNodeMap = new HashMap<>();
-        for ( final ControllerServiceNode node : servicesToEnable ) {
+        for (final ControllerServiceNode node : servicesToEnable) {
             idToNodeMap.put(node.getIdentifier(), node);
         }
-        
+
         // We can have many Controller Services dependent on one another. We can have many of these
         // disparate lists of Controller Services that are dependent on one another. We refer to each
         // of these as a branch.
         final List<List<ControllerServiceNode>> branches = determineEnablingOrder(idToNodeMap);
 
-        if ( branches.isEmpty() ) {
+        if (branches.isEmpty()) {
             logger.info("No Controller Services to enable");
             return;
         } else {
             logger.info("Will enable {} Controller Services", servicesToEnable.size());
         }
-        
+
         // Mark all services that are configured to be enabled as 'ENABLING'. This allows Processors, reporting tasks
         // to be valid so that they can be scheduled.
-        for ( final List<ControllerServiceNode> branch : branches ) {
-            for ( final ControllerServiceNode nodeToEnable : branch ) {
+        for (final List<ControllerServiceNode> branch : branches) {
+            for (final ControllerServiceNode nodeToEnable : branch) {
                 nodeToEnable.setState(ControllerServiceState.ENABLING);
             }
         }
-        
+
         final Set<ControllerServiceNode> enabledNodes = Collections.synchronizedSet(new HashSet<ControllerServiceNode>());
         final ExecutorService executor = Executors.newFixedThreadPool(Math.min(10, branches.size()));
-        for ( final List<ControllerServiceNode> branch : branches ) {
+        for (final List<ControllerServiceNode> branch : branches) {
             final Runnable enableBranchRunnable = new Runnable() {
                 @Override
                 public void run() {
                     logger.debug("Enabling Controller Service Branch {}", branch);
-                    
-                    for ( final ControllerServiceNode serviceNode : branch ) {
+
+                    for (final ControllerServiceNode serviceNode : branch) {
                         try {
-                            if ( !enabledNodes.contains(serviceNode) ) {
+                            if (!enabledNodes.contains(serviceNode)) {
                                 enabledNodes.add(serviceNode);
-                                
+
                                 logger.info("Enabling {}", serviceNode);
                                 try {
                                     processScheduler.enableControllerService(serviceNode);
                                 } catch (final Exception e) {
                                     logger.error("Failed to enable " + serviceNode + " due to " + e);
-                                    if ( logger.isDebugEnabled() ) {
+                                    if (logger.isDebugEnabled()) {
                                         logger.error("", e);
                                     }
-                                    
-                                    if ( bulletinRepo != null ) {
+
+                                    if (bulletinRepo != null) {
                                         bulletinRepo.addBulletin(BulletinFactory.createBulletin(
-                                            "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
+                                                "Controller Service", Severity.ERROR.name(), "Could not start " + serviceNode + " due to " + e));
                                     }
                                 }
                             }
-                            
+
                             // wait for service to finish enabling.
-                            while ( ControllerServiceState.ENABLING.equals(serviceNode.getState()) ) {
+                            while (ControllerServiceState.ENABLING.equals(serviceNode.getState())) {
                                 try {
                                     Thread.sleep(100L);
-                                } catch (final InterruptedException ie) {}
+                                } catch (final InterruptedException ie) {
+                                }
                             }
-                            
+
                             logger.info("State for {} is now {}", serviceNode, serviceNode.getState());
                         } catch (final Exception e) {
                             logger.error("Failed to enable {} due to {}", serviceNode, e.toString());
-                            if ( logger.isDebugEnabled() ) {
+                            if (logger.isDebugEnabled()) {
                                 logger.error("", e);
                             }
                         }
                     }
                 }
             };
-            
+
             executor.submit(enableBranchRunnable);
         }
-        
+
         executor.shutdown();
     }
-    
+
     static List<List<ControllerServiceNode>> determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap) {
         final List<List<ControllerServiceNode>> orderedNodeLists = new ArrayList<>();
-        
-        for ( final ControllerServiceNode node : serviceNodeMap.values() ) {
-            if ( orderedNodeLists.contains(node) ) {
+
+        for (final ControllerServiceNode node : serviceNodeMap.values()) {
+            if (orderedNodeLists.contains(node)) {
                 continue;   // this node is already in the list.
             }
-            
+
             final List<ControllerServiceNode> branch = new ArrayList<>();
             determineEnablingOrder(serviceNodeMap, node, branch, new HashSet<ControllerServiceNode>());
             orderedNodeLists.add(branch);
         }
-        
+
         return orderedNodeLists;
     }
-    
-    
-    private static void determineEnablingOrder(final Map<String, ControllerServiceNode> serviceNodeMap, final ControllerServiceNode contextNode, final List<ControllerServiceNode> orderedNodes, final Set<ControllerServiceNode> visited) {
-        if ( visited.contains(contextNode) ) {
+
+    private static void determineEnablingOrder(
+            final Map<String, ControllerServiceNode> serviceNodeMap,
+            final ControllerServiceNode contextNode,
+            final List<ControllerServiceNode> orderedNodes,
+            final Set<ControllerServiceNode> visited) {
+        if (visited.contains(contextNode)) {
             return;
         }
-        
-        for ( final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet() ) {
-            if ( entry.getKey().getControllerServiceDefinition() != null ) {
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : contextNode.getProperties().entrySet()) {
+            if (entry.getKey().getControllerServiceDefinition() != null) {
                 final String referencedServiceId = entry.getValue();
-                if ( referencedServiceId != null ) {
+                if (referencedServiceId != null) {
                     final ControllerServiceNode referencedNode = serviceNodeMap.get(referencedServiceId);
-                    if ( !orderedNodes.contains(referencedNode) ) {
+                    if (!orderedNodes.contains(referencedNode)) {
                         visited.add(contextNode);
                         determineEnablingOrder(serviceNodeMap, referencedNode, orderedNodes, visited);
                     }
@@ -427,12 +425,11 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
             }
         }
 
-        if ( !orderedNodes.contains(contextNode) ) {
+        if (!orderedNodes.contains(contextNode)) {
             orderedNodes.add(contextNode);
         }
     }
-    
-    
+
     @Override
     public void disableControllerService(final ControllerServiceNode serviceNode) {
         serviceNode.verifyCanDisable();
@@ -461,7 +458,7 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
         final ControllerServiceNode node = controllerServices.get(serviceIdentifier);
         return (node == null) ? false : (ControllerServiceState.ENABLING == node.getState());
     }
-    
+
     @Override
     public ControllerServiceNode getControllerServiceNode(final String serviceIdentifier) {
         return controllerServices.get(serviceIdentifier);
@@ -478,157 +475,158 @@ public class StandardControllerServiceProvider implements ControllerServiceProvi
 
         return identifiers;
     }
-    
+
     @Override
     public String getControllerServiceName(final String serviceIdentifier) {
-    	final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
-    	return node == null ? null : node.getName();
+        final ControllerServiceNode node = getControllerServiceNode(serviceIdentifier);
+        return node == null ? null : node.getName();
     }
-    
+
+    @Override
     public void removeControllerService(final ControllerServiceNode serviceNode) {
         final ControllerServiceNode existing = controllerServices.get(serviceNode.getIdentifier());
-        if ( existing == null || existing != serviceNode ) {
+        if (existing == null || existing != serviceNode) {
             throw new IllegalStateException("Controller Service " + serviceNode + " does not exist in this Flow");
         }
-        
+
         serviceNode.verifyCanDelete();
-        
+
         try (final NarCloseable x = NarCloseable.withNarLoader()) {
             final ConfigurationContext configurationContext = new StandardConfigurationContext(serviceNode, this);
             ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, serviceNode.getControllerServiceImplementation(), configurationContext);
         }
-        
-        for ( final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet() ) {
+
+        for (final Map.Entry<PropertyDescriptor, String> entry : serviceNode.getProperties().entrySet()) {
             final PropertyDescriptor descriptor = entry.getKey();
-            if (descriptor.getControllerServiceDefinition() != null ) {
+            if (descriptor.getControllerServiceDefinition() != null) {
                 final String value = entry.getValue() == null ? descriptor.getDefaultValue() : entry.getValue();
-                if ( value != null ) {
+                if (value != null) {
                     final ControllerServiceNode referencedNode = getControllerServiceNode(value);
-                    if ( referencedNode != null ) {
+                    if (referencedNode != null) {
                         referencedNode.removeReference(serviceNode);
                     }
                 }
             }
         }
-        
+
         controllerServices.remove(serviceNode.getIdentifier());
     }
-    
+
     @Override
     public Set<ControllerServiceNode> getAllControllerServices() {
-    	return new HashSet<>(controllerServices.values());
+        return new HashSet<>(controllerServices.values());
     }
-    
-    
+
     /**
-     * Returns a List of all components that reference the given referencedNode (either directly or indirectly through
-     * another service) that are also of the given componentType. The list that is returned is in the order in which they will
-     * need to be 'activated' (enabled/started).
-     * @param referencedNode
-     * @param componentType
-     * @return
+     * Returns a List of all components that reference the given referencedNode
+     * (either directly or indirectly through another service) that are also of
+     * the given componentType. The list that is returned is in the order in
+     * which they will need to be 'activated' (enabled/started).
+     *
+     * @param referencedNode node
+     * @param componentType type
+     * @return list of components
      */
     private <T> List<T> findRecursiveReferences(final ControllerServiceNode referencedNode, final Class<T> componentType) {
         final List<T> references = new ArrayList<>();
-        
-        for ( final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents() ) {
-            if ( componentType.isAssignableFrom(referencingComponent.getClass()) ) {
+
+        for (final ConfiguredComponent referencingComponent : referencedNode.getReferences().getReferencingComponents()) {
+            if (componentType.isAssignableFrom(referencingComponent.getClass())) {
                 references.add(componentType.cast(referencingComponent));
             }
-            
-            if ( referencingComponent instanceof ControllerServiceNode ) {
+
+            if (referencingComponent instanceof ControllerServiceNode) {
                 final ControllerServiceNode referencingNode = (ControllerServiceNode) referencingComponent;
-                
+
                 // find components recursively that depend on referencingNode.
                 final List<T> recursive = findRecursiveReferences(referencingNode, componentType);
-                
+
                 // For anything that depends on referencing node, we want to add it to the list, but we know
                 // that it must come after the referencing node, so we first remove any existing occurrence.
                 references.removeAll(recursive);
                 references.addAll(recursive);
             }
         }
-        
+
         return references;
     }
 
-    
     @Override
     public void enableReferencingServices(final ControllerServiceNode serviceNode) {
         final List<ControllerServiceNode> recursiveReferences = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
         enableReferencingServices(serviceNode, recursiveReferences);
     }
-    
+
     private void enableReferencingServices(final ControllerServiceNode serviceNode, final List<ControllerServiceNode> recursiveReferences) {
-        if ( serviceNode.getState() != ControllerServiceState.ENABLED && serviceNode.getState() != ControllerServiceState.ENABLING ) {
+        if (serviceNode.getState() != ControllerServiceState.ENABLED && serviceNode.getState() != ControllerServiceState.ENABLING) {
             serviceNode.verifyCanEnable(new HashSet<>(recursiveReferences));
         }
-        
+
         final Set<ControllerServiceNode> ifEnabled = new HashSet<>();
         final List<ControllerServiceNode> toEnable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
-        for ( final ControllerServiceNode nodeToEnable : toEnable ) {
+        for (final ControllerServiceNode nodeToEnable : toEnable) {
             final ControllerServiceState state = nodeToEnable.getState();
-            if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+            if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
                 nodeToEnable.verifyCanEnable(ifEnabled);
                 ifEnabled.add(nodeToEnable);
             }
         }
-        
-        for ( final ControllerServiceNode nodeToEnable : toEnable ) {
+
+        for (final ControllerServiceNode nodeToEnable : toEnable) {
             final ControllerServiceState state = nodeToEnable.getState();
-            if ( state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING ) {
+            if (state != ControllerServiceState.ENABLED && state != ControllerServiceState.ENABLING) {
                 enableControllerService(nodeToEnable);
             }
         }
     }
-    
+
     @Override
     public void verifyCanEnableReferencingServices(final ControllerServiceNode serviceNode) {
         final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
         final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
-        
-        for ( final ControllerServiceNode referencingService : referencingServices ) {
+
+        for (final ControllerServiceNode referencingService : referencingServices) {
             referencingService.verifyCanEnable(referencingServiceSet);
         }
     }
-    
+
     @Override
     public void verifyCanScheduleReferencingComponents(final ControllerServiceNode serviceNode) {
         final List<ControllerServiceNode> referencingServices = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
         final List<ReportingTaskNode> referencingReportingTasks = findRecursiveReferences(serviceNode, ReportingTaskNode.class);
         final List<ProcessorNode> referencingProcessors = findRecursiveReferences(serviceNode, ProcessorNode.class);
-        
+
         final Set<ControllerServiceNode> referencingServiceSet = new HashSet<>(referencingServices);
-        
-        for ( final ReportingTaskNode taskNode : referencingReportingTasks ) {
-            if ( taskNode.getScheduledState() != ScheduledState.DISABLED ) {
+
+        for (final ReportingTaskNode taskNode : referencingReportingTasks) {
+            if (taskNode.getScheduledState() != ScheduledState.DISABLED) {
                 taskNode.verifyCanStart(referencingServiceSet);
             }
         }
-        
-        for ( final ProcessorNode procNode : referencingProcessors ) {
-            if ( procNode.getScheduledState() != ScheduledState.DISABLED ) {
+
+        for (final ProcessorNode procNode : referencingProcessors) {
+            if (procNode.getScheduledState() != ScheduledState.DISABLED) {
                 procNode.verifyCanStart(referencingServiceSet);
             }
         }
     }
-    
+
     @Override
     public void verifyCanDisableReferencingServices(final ControllerServiceNode serviceNode) {
         // Get a list of all Controller Services that need to be disabled, in the order that they need to be
         // disabled.
         final List<ControllerServiceNode> toDisable = findRecursiveReferences(serviceNode, ControllerServiceNode.class);
         final Set<ControllerServiceNode> serviceSet = new HashSet<>(toDisable);
-        
-        for ( final ControllerServiceNode nodeToDisable : toDisable ) {
+
+        for (final ControllerServiceNode nodeToDisable : toDisable) {
             final ControllerServiceState state = nodeToDisable.getState();
-            
-            if ( state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING ) {
+
+            if (state != ControllerServiceState.DISABLED && state != ControllerServiceState.DISABLING) {
                 nodeToDisable.verifyCanDisable(serviceSet);
             }
         }
     }
-    
+
     @Override
     public void verifyCanStopReferencingComponents(final ControllerServiceNode serviceNode) {
         // we can always stop referencing components

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
index c470b99..701adcf 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceReference.java
@@ -65,9 +65,9 @@ public class StandardControllerServiceReference implements ControllerServiceRefe
         for (final ConfiguredComponent component : components) {
             if (component instanceof ControllerServiceNode) {
                 serviceNodes.add((ControllerServiceNode) component);
-                
+
                 final ControllerServiceState state = ((ControllerServiceNode) component).getState();
-                if ( state != ControllerServiceState.DISABLED ) {
+                if (state != ControllerServiceState.DISABLED) {
                     activeReferences.add(component);
                 }
             } else if (isRunning(component)) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
index 89ac846..6970fce 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StandardMetricDescriptor.java
@@ -31,7 +31,8 @@ public class StandardMetricDescriptor<T> implements MetricDescriptor<T> {
         this(field, label, description, formatter, valueFunction, null);
     }
 
-    public StandardMetricDescriptor(final String field, final String label, final String description, final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) {
+    public StandardMetricDescriptor(final String field, final String label, final String description,
+            final MetricDescriptor.Formatter formatter, final ValueMapper<T> valueFunction, final ValueReducer<StatusSnapshot, Long> reducer) {
         this.field = field;
         this.label = label;
         this.description = description;
@@ -40,41 +41,21 @@ public class StandardMetricDescriptor<T> implements MetricDescriptor<T> {
         this.reducer = reducer == null ? new SumReducer() : reducer;
     }
 
-    /**
-     * The name of this status field.
-     *
-     * @return
-     */
     @Override
     public String getField() {
         return field;
     }
 
-    /**
-     * The label of this status field.
-     *
-     * @return
-     */
     @Override
     public String getLabel() {
         return label;
     }
 
-    /**
-     * The description of this status field.
-     *
-     * @return
-     */
     @Override
     public String getDescription() {
         return description;
     }
 
-    /**
-     * The formatter for this descriptor.
-     *
-     * @return
-     */
     @Override
     public MetricDescriptor.Formatter getFormatter() {
         return formatter;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
index 0872192..d2a983a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/VolatileComponentStatusRepository.java
@@ -91,7 +91,8 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
 
     @Override
     public synchronized void capture(final ProcessGroupStatus rootGroupStatus, final Date timestamp) {
-        captures.add(new Capture(timestamp, ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, ComponentType.PROCESSOR, ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, ComponentType.REMOTE_PROCESS_GROUP)));
+        captures.add(new Capture(timestamp, ComponentStatusReport.fromProcessGroupStatus(rootGroupStatus, ComponentType.PROCESSOR,
+                ComponentType.CONNECTION, ComponentType.PROCESS_GROUP, ComponentType.REMOTE_PROCESS_GROUP)));
         logger.debug("Captured metrics for {}", this);
         lastCaptureTime = Math.max(lastCaptureTime, timestamp.getTime());
     }
@@ -269,48 +270,57 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
 
     public static enum RemoteProcessGroupStatusDescriptor {
 
-        SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)", "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return status.getSentContentSize();
-            }
-        })),
-        SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)", "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return Long.valueOf(status.getSentCount().longValue());
-            }
-        })),
-        RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)", "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return status.getReceivedContentSize();
-            }
-        })),
-        RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)", "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return Long.valueOf(status.getReceivedCount().longValue());
-            }
-        })),
-        RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return Long.valueOf(status.getReceivedContentSize().longValue() / 300L);
-            }
-        })),
-        SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second", "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return Long.valueOf(status.getSentContentSize().longValue() / 300L);
-            }
-        })),
-        TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second", "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
-            @Override
-            public Long getValue(final RemoteProcessGroupStatus status) {
-                return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L);
-            }
-        })),
+        SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)",
+                "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus status) {
+                        return status.getSentContentSize();
+                    }
+                })),
+        SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)",
+                "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus status) {
+                        return Long.valueOf(status.getSentCount().longValue());
+                    }
+                })),
+        RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)",
+                "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus status) {
+                        return status.getReceivedContentSize();
+                    }
+                })),
+        RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)",
+                "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus status) {
+                        return Long.valueOf(status.getReceivedCount().longValue());
+                    }
+                })),
+        RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second",
+                "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
+                Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus status) {
+                        return Long.valueOf(status.getReceivedContentSize().longValue() / 300L);
+                    }
+                })),
+        SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second",
+                "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus status) {
+                        return Long.valueOf(status.getSentContentSize().longValue() / 300L);
+                    }
+                })),
+        TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second",
+                "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second",
+                Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final RemoteProcessGroupStatus status) {
+                        return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L);
+                    }
+                })),
         AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>(
                 "averageLineageDuration",
                 "Average Lineage Duration (5 mins)",
@@ -358,66 +368,83 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
 
     public static enum ProcessGroupStatusDescriptor {
 
-        BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)", "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getBytesRead();
-            }
-        })),
-        BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)", "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getBytesWritten();
-            }
-        })),
-        BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)", "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getBytesRead() + status.getBytesWritten();
-            }
-        })),
-        INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getInputContentSize();
-            }
-        })),
-        INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getInputCount().longValue();
-            }
-        })),
-        OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getOutputContentSize();
-            }
-        })),
-        OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getOutputCount().longValue();
-            }
-        })),
-        QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes", "The cumulative size of all FlowFiles queued in all Connections of this Process Group", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getQueuedContentSize();
-            }
-        })),
-        QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count", "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return status.getQueuedCount().longValue();
-            }
-        })),
-        TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)", "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes", Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() {
-            @Override
-            public Long getValue(final ProcessGroupStatus status) {
-                return calculateTaskMillis(status);
-            }
-        }));
+        BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)",
+                "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getBytesRead();
+                    }
+                })),
+        BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)",
+                "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getBytesWritten();
+                    }
+                })),
+        BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)",
+                "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes",
+                Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getBytesRead() + status.getBytesWritten();
+                    }
+                })),
+        INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)",
+                "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
+                Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getInputContentSize();
+                    }
+                })),
+        INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)",
+                "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
+                Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getInputCount().longValue();
+                    }
+                })),
+        OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)",
+                "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
+                Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getOutputContentSize();
+                    }
+                })),
+        OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)",
+                "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
+                Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getOutputCount().longValue();
+                    }
+                })),
+        QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes",
+                "The cumulative size of all FlowFiles queued in all Connections of this Process Group",
+                Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getQueuedContentSize();
+                    }
+                })),
+        QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count",
+                "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return status.getQueuedCount().longValue();
+                    }
+                })),
+        TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)",
+                "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes",
+                Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() {
+                    @Override
+                    public Long getValue(final ProcessGroupStatus status) {
+                        return calculateTaskMillis(status);
+                    }
+                }));
 
         private MetricDescriptor<ProcessGroupStatus> descriptor;
 
@@ -436,42 +463,48 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
 
     public static enum ConnectionStatusDescriptor {
 
-        INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
-            @Override
-            public Long getValue(final ConnectionStatus status) {
-                return status.getInputBytes();
-            }
-        })),
-        INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
-            @Override
-            public Long getValue(final ConnectionStatus status) {
-                return Long.valueOf(status.getInputCount());
-            }
-        })),
-        OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
-            @Override
-            public Long getValue(final ConnectionStatus status) {
-                return status.getOutputBytes();
-            }
-        })),
-        OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
-            @Override
-            public Long getValue(final ConnectionStatus status) {
-                return Long.valueOf(status.getOutputCount());
-            }
-        })),
-        QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes", "The number of Bytes queued in this Connection", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
-            @Override
-            public Long getValue(final ConnectionStatus status) {
-                return status.getQueuedBytes();
-            }
-        })),
-        QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count", "The number of FlowFiles queued in this Connection", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
-            @Override
-            public Long getValue(final ConnectionStatus status) {
-                return Long.valueOf(status.getQueuedCount());
-            }
-        }));
+        INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("inputBytes", "Bytes In (5 mins)",
+                "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
+                    @Override
+                    public Long getValue(final ConnectionStatus status) {
+                        return status.getInputBytes();
+                    }
+                })),
+        INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("inputCount", "FlowFiles In (5 mins)",
+                "The number of FlowFiles that were transferred to this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
+                    @Override
+                    public Long getValue(final ConnectionStatus status) {
+                        return Long.valueOf(status.getInputCount());
+                    }
+                })),
+        OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>("outputBytes", "Bytes Out (5 mins)",
+                "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
+                    @Override
+                    public Long getValue(final ConnectionStatus status) {
+                        return status.getOutputBytes();
+                    }
+                })),
+        OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>("outputCount", "FlowFiles Out (5 mins)",
+                "The number of FlowFiles that were pulled from this Connection in the past 5 minutes", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
+                    @Override
+                    public Long getValue(final ConnectionStatus status) {
+                        return Long.valueOf(status.getOutputCount());
+                    }
+                })),
+        QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>("queuedBytes", "Queued Bytes",
+                "The number of Bytes queued in this Connection", Formatter.DATA_SIZE, new ValueMapper<ConnectionStatus>() {
+                    @Override
+                    public Long getValue(final ConnectionStatus status) {
+                        return status.getQueuedBytes();
+                    }
+                })),
+        QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>("queuedCount", "Queued Count",
+                "The number of FlowFiles queued in this Connection", Formatter.COUNT, new ValueMapper<ConnectionStatus>() {
+                    @Override
+                    public Long getValue(final ConnectionStatus status) {
+                        return Long.valueOf(status.getQueuedCount());
+                    }
+                }));
 
         private MetricDescriptor<ConnectionStatus> descriptor;
 
@@ -490,66 +523,76 @@ public class VolatileComponentStatusRepository implements ComponentStatusReposit
 
     public static enum ProcessorStatusDescriptor {
 
-        BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", "Bytes Read (5 mins)", "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return status.getBytesRead();
-            }
-        })),
-        BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 mins)", "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return status.getBytesWritten();
-            }
-        })),
-        BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes Transferred (5 mins)", "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return status.getBytesRead() + status.getBytesWritten();
-            }
-        })),
-        INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)", "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return status.getInputBytes();
-            }
-        })),
-        INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 mins)", "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return Long.valueOf(status.getInputCount());
-            }
-        })),
-        OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)", "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return status.getOutputBytes();
-            }
-        })),
-        OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 mins)", "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return Long.valueOf(status.getOutputCount());
-            }
-        })),
-        TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return Long.valueOf(status.getInvocations());
-            }
-        })),
-        TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 mins)", "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", Formatter.DURATION, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS);
-            }
-        })),
-        FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles Removed (5 mins)", "The total number of FlowFiles removed by this Processor in the last 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
-            @Override
-            public Long getValue(final ProcessorStatus status) {
-                return Long.valueOf(status.getFlowFilesRemoved());
-            }
-        })),
+        BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>("bytesRead", "Bytes Read (5 mins)",
+                "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return status.getBytesRead();
+                    }
+                })),
+        BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>("bytesWritten", "Bytes Written (5 mins)",
+                "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return status.getBytesWritten();
+                    }
+                })),
+        BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>("bytesTransferred", "Bytes Transferred (5 mins)",
+                "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return status.getBytesRead() + status.getBytesWritten();
+                    }
+                })),
+        INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("inputBytes", "Bytes In (5 mins)",
+                "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return status.getInputBytes();
+                    }
+                })),
+        INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("inputCount", "FlowFiles In (5 mins)",
+                "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return Long.valueOf(status.getInputCount());
+                    }
+                })),
+        OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>("outputBytes", "Bytes Out (5 mins)",
+                "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return status.getOutputBytes();
+                    }
+                })),
+        OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>("outputCount", "FlowFiles Out (5 mins)",
+                "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return Long.valueOf(status.getOutputCount());
+                    }
+                })),
+        TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>("taskCount", "Tasks (5 mins)", "The number of tasks that this Processor has completed in the past 5 minutes",
+                Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return Long.valueOf(status.getInvocations());
+                    }
+                })),
+        TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>("taskMillis", "Total Task Duration (5 mins)",
+                "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes", Formatter.DURATION, new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS);
+                    }
+                })),
+        FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>("flowFilesRemoved", "FlowFiles Removed (5 mins)",
+                "The total number of FlowFiles removed by this Processor in the last 5 minutes", Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+                    @Override
+                    public Long getValue(final ProcessorStatus status) {
+                        return Long.valueOf(status.getFlowFilesRemoved());
+                    }
+                })),
         AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>(
                 "averageLineageDuration",
                 "Average Lineage Duration (5 mins)",

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
index 5ecd22e..f3cbb90 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunConnectableTask.java
@@ -35,8 +35,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Continually runs a Connectable as long as the processor has work to do. {@link #call()} will return
- * <code>true</code> if the Connectable should be yielded, <code>false</code> otherwise.
+ * Continually runs a Connectable as long as the processor has work to do.
+ * {@link #call()} will return <code>true</code> if the Connectable should be
+ * yielded, <code>false</code> otherwise.
  */
 public class ContinuallyRunConnectableTask implements Callable<Boolean> {
 
@@ -60,7 +61,7 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
         if (!scheduleState.isScheduled()) {
             return false;
         }
-        
+
         // Connectable should run if the following conditions are met:
         // 1. It is not yielded.
         // 2. It has incoming connections with FlowFiles queued or doesn't expect incoming connections
@@ -106,7 +107,7 @@ public class ContinuallyRunConnectableTask implements Callable<Boolean> {
             // yield for just a bit.
             return true;
         }
-        
-        return false;	// do not yield
+
+        return false; // do not yield
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
index cff8744..baed6ae 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ContinuallyRunProcessorTask.java
@@ -43,10 +43,10 @@ import org.apache.nifi.util.ReflectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
- * Continually runs a processor as long as the processor has work to do. {@link #call()} will return
- * <code>true</code> if the processor should be yielded, <code>false</code> otherwise.
+ * Continually runs a processor as long as the processor has work to do.
+ * {@link #call()} will return <code>true</code> if the processor should be
+ * yielded, <code>false</code> otherwise.
  */
 public class ContinuallyRunProcessorTask implements Callable<Boolean> {
 
@@ -61,7 +61,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
     private final int numRelationships;
 
     public ContinuallyRunProcessorTask(final SchedulingAgent schedulingAgent, final ProcessorNode procNode,
-            final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState, 
+            final FlowController flowController, final ProcessContextFactory contextFactory, final ScheduleState scheduleState,
             final StandardProcessContext processContext) {
 
         this.schedulingAgent = schedulingAgent;
@@ -163,9 +163,9 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
                 if (batch) {
                     rawSession.commit();
                 }
-    
+
                 final long processingNanos = System.nanoTime() - startNanos;
-    
+
                 // if the processor is no longer scheduled to run and this is the last thread,
                 // invoke the OnStopped methods
                 if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
@@ -174,7 +174,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
                         flowController.heartbeat();
                     }
                 }
-    
+
                 try {
                     final StandardFlowFileEvent procEvent = new StandardFlowFileEvent(procNode.getIdentifier());
                     procEvent.setProcessingNanos(processingNanos);
@@ -188,7 +188,7 @@ public class ContinuallyRunProcessorTask implements Callable<Boolean> {
                 scheduleState.decrementActiveThreadCount();
             }
         }
-        
+
         return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
index 0c472c8..5724bb4 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/tasks/ReportingTaskWrapper.java
@@ -42,7 +42,7 @@ public class ReportingTaskWrapper implements Runnable {
             taskNode.getReportingTask().onTrigger(taskNode.getReportingContext());
         } catch (final Throwable t) {
             final ComponentLog componentLog = new SimpleProcessLogger(taskNode.getIdentifier(), taskNode.getReportingTask());
-            componentLog.error("Error running task {} due to {}", new Object[] {taskNode.getReportingTask(), t.toString()});
+            componentLog.error("Error running task {} due to {}", new Object[]{taskNode.getReportingTask(), t.toString()});
             if (componentLog.isDebugEnabled()) {
                 componentLog.error("", t);
             }
@@ -52,7 +52,9 @@ public class ReportingTaskWrapper implements Runnable {
                 // invoke the OnStopped methods
                 if (!scheduleState.isScheduled() && scheduleState.getActiveThreadCount() == 1 && scheduleState.mustCallOnStoppedMethods()) {
                     try (final NarCloseable x = NarCloseable.withNarLoader()) {
-                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class, taskNode.getReportingTask(), taskNode.getConfigurationContext());
+                        ReflectionUtils.quietlyInvokeMethodsWithAnnotation(
+                                OnStopped.class, org.apache.nifi.processor.annotation.OnStopped.class,
+                                taskNode.getReportingTask(), taskNode.getConfigurationContext());
                     }
                 }
             } finally {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
index be79c5b..fccd10e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/encrypt/StringEncryptor.java
@@ -76,7 +76,7 @@ public final class StringEncryptor {
      * Creates an instance of the nifi sensitive property encryptor. Validates
      * that the encryptor is actually working.
      *
-     * @return
+     * @return encryptor
      * @throws EncryptionException if any issues arise initializing or
      * validating the encryptor
      */

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
index 76e8e3e..3be178f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/engine/FlowEngine.java
@@ -27,9 +27,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicInteger;
 
-/**
- * @author unattributed
- */
 public final class FlowEngine extends ScheduledThreadPoolExecutor {
 
     private static final Logger logger = LoggerFactory.getLogger(FlowEngine.class);
@@ -39,19 +36,20 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
      *
      * @param corePoolSize the maximum number of threads available to tasks
      * running in the engine.
-     * @param threadNamePrefix
+     * @param threadNamePrefix for naming the thread
      */
     public FlowEngine(int corePoolSize, final String threadNamePrefix) {
-    	this(corePoolSize, threadNamePrefix, false);
+        this(corePoolSize, threadNamePrefix, false);
     }
-    	
+
     /**
      * Creates a new instance of FlowEngine
      *
      * @param corePoolSize the maximum number of threads available to tasks
      * running in the engine.
-     * @param threadNamePrefix
-     * @param deamon if true, the thread pool will be populated with daemon threads, otherwise the threads will not be marked as daemon.
+     * @param threadNamePrefix for thread naming
+     * @param daemon if true, the thread pool will be populated with daemon
+     * threads, otherwise the threads will not be marked as daemon.
      */
     public FlowEngine(int corePoolSize, final String threadNamePrefix, final boolean daemon) {
         super(corePoolSize);
@@ -62,8 +60,8 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
             @Override
             public Thread newThread(final Runnable r) {
                 final Thread t = defaultThreadFactory.newThread(r);
-                if ( daemon ) {
-                	t.setDaemon(true);
+                if (daemon) {
+                    t.setDaemon(true);
                 }
                 t.setName(threadNamePrefix + " Thread-" + threadIndex.incrementAndGet());
                 return t;
@@ -75,8 +73,8 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
      * Hook method called by the running thread whenever a runnable task is
      * given to the thread to run.
      *
-     * @param thread
-     * @param runnable
+     * @param thread thread
+     * @param runnable runnable
      */
     @Override
     protected void beforeExecute(final Thread thread, final Runnable runnable) {
@@ -90,8 +88,8 @@ public final class FlowEngine extends ScheduledThreadPoolExecutor {
      * execution of the runnable completed. Logs the fact of completion and any
      * errors that might have occured.
      *
-     * @param runnable
-     * @param throwable
+     * @param runnable runnable
+     * @param throwable throwable
      */
     @Override
     protected void afterExecute(final Runnable runnable, final Throwable throwable) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/888254b2/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index 044541b..e8708bd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -180,7 +180,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
      * bulletin strategy is employed, bulletins will not be persisted in this
      * repository and will sent to the specified strategy instead.
      *
-     * @param strategy
+     * @param strategy bulletin strategy
      */
     public void overrideDefaultBulletinProcessing(final BulletinProcessingStrategy strategy) {
         Objects.requireNonNull(strategy);