You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/07/02 22:47:57 UTC

[01/10] incubator-nifi git commit: NIFI-724: Enable bulletins for reporting tasks and controller services

Repository: incubator-nifi
Updated Branches:
  refs/heads/develop a09180799 -> eddc071b8


NIFI-724: Enable bulletins for reporting tasks and controller services


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/f1adb8bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/f1adb8bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/f1adb8bf

Branch: refs/heads/develop
Commit: f1adb8bf034a8407dfdd655e441c74c10a61b18f
Parents: e767f5c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jun 24 14:03:34 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jun 24 14:03:34 2015 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/reporting/Bulletin.java     | 11 ++-
 .../apache/nifi/reporting/ComponentType.java    | 58 ++++++++++++++
 .../manager/impl/ClusteredReportingContext.java | 46 ++++++++++-
 .../cluster/manager/impl/WebClusterManager.java | 80 ++++++++++++--------
 .../org/apache/nifi/events/BulletinFactory.java | 30 ++++++--
 .../org/apache/nifi/events/SystemBulletin.java  |  2 +
 .../nifi/logging/LogRepositoryFactory.java      |  6 +-
 .../apache/nifi/controller/FlowController.java  | 35 ++++++---
 .../service/ControllerServiceLoader.java        | 11 +--
 .../nifi/events/VolatileBulletinRepository.java |  7 +-
 .../org/apache/nifi/jaxb/AdaptedBulletin.java   | 10 +++
 .../org/apache/nifi/jaxb/BulletinAdapter.java   |  3 +-
 .../logging/ControllerServiceLogObserver.java   | 45 +++++++++++
 .../nifi/logging/ReportingTaskLogObserver.java  | 45 +++++++++++
 .../nifi/remote/StandardRemoteProcessGroup.java | 16 ++--
 .../nifi/remote/StandardRootGroupPort.java      |  4 +-
 16 files changed, 341 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
index 87443a3..fe370ae 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
@@ -34,6 +34,7 @@ public abstract class Bulletin implements Comparable<Bulletin> {
     private String groupId;
     private String sourceId;
     private String sourceName;
+    private ComponentType sourceType;
 
     protected Bulletin(final long id) {
         this.timestamp = new Date();
@@ -104,9 +105,17 @@ public abstract class Bulletin implements Comparable<Bulletin> {
         this.sourceName = sourceName;
     }
 
+    public ComponentType getSourceType() {
+        return sourceType;
+    }
+
+    public void setSourceType(ComponentType sourceType) {
+        this.sourceType = sourceType;
+    }
+
     @Override
     public String toString() {
-        return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + '}';
+        return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + ", sourceType=" + sourceType + '}';
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
new file mode 100644
index 0000000..97f3538
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting;
+
+/**
+ * An Enumeration for indicating which type of component a Bulletin is associated with
+ */
+public enum ComponentType {
+
+    /**
+     * Bulletin is associated with a Processor
+     */
+    PROCESSOR,
+
+    /**
+     * Bulletin is associated with a Remote Process Group
+     */
+    REMOTE_PROCESS_GROUP,
+
+    /**
+     * Bulletin is associated with an Input Port
+     */
+    INPUT_PORT,
+
+    /**
+     * Bulletin is associated with an Output Port
+     */
+    OUTPUT_PORT,
+
+    /**
+     * Bulletin is associated with a Reporting Task
+     */
+    REPORTING_TASK,
+
+    /**
+     * Bulletin is associated with a Controller Service
+     */
+    CONTROLLER_SERVICE,
+
+    /**
+     * Bulletin is a system-level bulletin, associated with the Flow Controller
+     */
+    FLOW_CONTROLLER;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
index e546f87..c6624cc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
@@ -24,15 +24,18 @@ import org.apache.nifi.attribute.expression.language.PreparedQuery;
 import org.apache.nifi.attribute.expression.language.Query;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.processor.StandardPropertyValue;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.EventAccess;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.Severity;
@@ -85,8 +88,9 @@ public class ClusteredReportingContext implements ReportingContext {
         final ProcessGroupStatus rootGroupStatus = eventAccess.getControllerStatus();
         final String groupId = findGroupId(rootGroupStatus, componentId);
         final String componentName = findComponentName(rootGroupStatus, componentId);
+        final ComponentType componentType = findComponentType(rootGroupStatus, componentId);
 
-        return BulletinFactory.createBulletin(groupId, componentId, componentName, category, severity.name(), message);
+        return BulletinFactory.createBulletin(groupId, componentId, componentType, componentName, category, severity.name(), message);
     }
 
     @Override
@@ -134,6 +138,46 @@ public class ClusteredReportingContext implements ReportingContext {
         return null;
     }
 
+    private ComponentType findComponentType(final ProcessGroupStatus groupStatus, final String componentId) {
+        for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) {
+            if (procStatus.getId().equals(componentId)) {
+                return ComponentType.PROCESSOR;
+            }
+        }
+
+        for (final PortStatus portStatus : groupStatus.getInputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return ComponentType.INPUT_PORT;
+            }
+        }
+
+        for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return ComponentType.OUTPUT_PORT;
+            }
+        }
+
+        for (final RemoteProcessGroupStatus remoteStatus : groupStatus.getRemoteProcessGroupStatus()) {
+            if (remoteStatus.getId().equals(componentId)) {
+                return ComponentType.REMOTE_PROCESS_GROUP;
+            }
+        }
+
+        for (final ProcessGroupStatus childGroup : groupStatus.getProcessGroupStatus()) {
+            final ComponentType type = findComponentType(childGroup, componentId);
+            if (type != null) {
+                return type;
+            }
+        }
+
+        final ControllerService service = serviceProvider.getControllerService(componentId);
+        if (service != null) {
+            return ComponentType.CONTROLLER_SERVICE;
+        }
+
+        return null;
+    }
+
     private String findComponentName(final ProcessGroupStatus groupStatus, final String componentId) {
         for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) {
             if (procStatus.getId().equals(componentId)) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index d6ba6db..9edc83f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -129,6 +129,7 @@ import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.StandardFlowSerializer;
+import org.apache.nifi.controller.StandardProcessorNode;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
@@ -159,7 +160,12 @@ import org.apache.nifi.events.VolatileBulletinRepository;
 import org.apache.nifi.framework.security.util.SslContextFactory;
 import org.apache.nifi.io.socket.multicast.DiscoverableService;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.ControllerServiceLogObserver;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.logging.ReportingTaskLogObserver;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
@@ -929,7 +935,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             //optional properties for all ReportingTasks
             for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "reportingTask")) {
                 //add global properties common to all tasks
-                Map<String, String> properties = new HashMap<>();
+                final Map<String, String> properties = new HashMap<>();
 
                 //get properties for the specific reporting task - id, name, class,
                 //and schedulingPeriod must be set
@@ -1080,6 +1086,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             }
         }
 
+        // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+            new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
+
         return taskNode;
     }
 
@@ -1368,7 +1379,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
     @Override
     public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
-        return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+        final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+
+        // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+            new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
+
+        return serviceNode;
     }
 
     @Override
@@ -1630,7 +1648,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort();
 
         // unmarshal the message
-        BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload());
+        final BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload());
         for (final Bulletin bulletin : payload.getBulletins()) {
             bulletin.setNodeAddress(nodeAddress);
             bulletinRepository.addBulletin(bulletin);
@@ -1688,7 +1706,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
             final int numPendingHeartbeats = mostRecentHeartbeats.size();
             if (heartbeatLogger.isDebugEnabled()) {
-                heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, (numPendingHeartbeats > 1) ? "s" : ""));
+                heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, numPendingHeartbeats > 1 ? "s" : ""));
             }
 
             for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) {
@@ -2130,7 +2148,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         readLock.lock();
         try {
             final ClusterServicesBroadcaster broadcaster = this.servicesBroadcaster;
-            return (broadcaster != null && broadcaster.isRunning());
+            return broadcaster != null && broadcaster.isRunning();
         } finally {
             readLock.unlock("isBroadcasting");
         }
@@ -2323,7 +2341,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                                 if (auditService != null) {
                                     try {
                                         auditService.addActions(clusterContext.getActions());
-                                    } catch (Throwable t) {
+                                    } catch (final Throwable t) {
                                         logger.warn("Unable to record actions: " + t.getMessage());
                                         if (logger.isDebugEnabled()) {
                                             logger.warn(StringUtils.EMPTY, t);
@@ -2834,7 +2852,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ProcessorEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorEntity.class);
+                final ProcessorEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorEntity.class);
                 final ProcessorDTO nodeProcessor = nodeResponseEntity.getProcessor();
                 processorMap.put(nodeResponse.getNodeId(), nodeProcessor);
             }
@@ -2851,7 +2869,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ProcessorsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
+                final ProcessorsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
                 final Set<ProcessorDTO> nodeProcessors = nodeResponseEntity.getProcessors();
 
                 for (final ProcessorDTO nodeProcessor : nodeProcessors) {
@@ -2892,7 +2910,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                         continue;
                     }
 
-                    final ProcessGroupEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
+                    final ProcessGroupEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
                     final ProcessGroupDTO nodeProcessGroup = nodeResponseEntity.getProcessGroup();
 
                     for (final ProcessorDTO nodeProcessor : nodeProcessGroup.getContents().getProcessors()) {
@@ -2952,7 +2970,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                         continue;
                     }
 
-                    final FlowSnippetEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
+                    final FlowSnippetEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
                     final FlowSnippetDTO nodeContents = nodeResponseEntity.getContents();
 
                     for (final ProcessorDTO nodeProcessor : nodeContents.getProcessors()) {
@@ -2995,7 +3013,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
             // create a new client response
             clientResponse = new NodeResponse(clientResponse, responseEntity);
-        } else if (hasSuccessfulClientResponse && (isRemoteProcessGroupEndpoint(uri, method))) {
+        } else if (hasSuccessfulClientResponse && isRemoteProcessGroupEndpoint(uri, method)) {
             final RemoteProcessGroupEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
             final RemoteProcessGroupDTO remoteProcessGroup = responseEntity.getRemoteProcessGroup();
 
@@ -3005,7 +3023,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final RemoteProcessGroupEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
+                final RemoteProcessGroupEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
                 final RemoteProcessGroupDTO nodeRemoteProcessGroup = nodeResponseEntity.getRemoteProcessGroup();
 
                 remoteProcessGroupMap.put(nodeResponse.getNodeId(), nodeRemoteProcessGroup);
@@ -3013,7 +3031,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             mergeRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupMap);
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
-        } else if (hasSuccessfulClientResponse && (isRemoteProcessGroupsEndpoint(uri, method))) {
+        } else if (hasSuccessfulClientResponse && isRemoteProcessGroupsEndpoint(uri, method)) {
             final RemoteProcessGroupsEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
             final Set<RemoteProcessGroupDTO> remoteProcessGroups = responseEntity.getRemoteProcessGroups();
 
@@ -3023,7 +3041,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final RemoteProcessGroupsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
+                final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
                 final Set<RemoteProcessGroupDTO> nodeRemoteProcessGroups = nodeResponseEntity.getRemoteProcessGroups();
 
                 for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeRemoteProcessGroups) {
@@ -3056,7 +3074,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ProvenanceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class);
+                final ProvenanceEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class);
                 final ProvenanceDTO nodeQuery = nodeResponseEntity.getProvenance();
 
                 resultsMap.put(nodeResponse.getNodeId(), nodeQuery);
@@ -3084,7 +3102,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ControllerServiceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+                final ControllerServiceEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
                 final ControllerServiceDTO nodeControllerService = nodeResponseEntity.getControllerService();
 
                 resultsMap.put(nodeResponse.getNodeId(), nodeControllerService);
@@ -3102,7 +3120,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ControllerServicesEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+                final ControllerServicesEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
                 final Set<ControllerServiceDTO> nodeControllerServices = nodeResponseEntity.getControllerServices();
 
                 for (final ControllerServiceDTO nodeControllerService : nodeControllerServices) {
@@ -3136,7 +3154,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 }
 
                 final ControllerServiceReferencingComponentsEntity nodeResponseEntity =
-                        (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+                        nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
                 final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents();
 
                 resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
@@ -3154,7 +3172,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ReportingTaskEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+                final ReportingTaskEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
                 final ReportingTaskDTO nodeReportingTask = nodeResponseEntity.getReportingTask();
 
                 resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask);
@@ -3172,7 +3190,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ReportingTasksEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+                final ReportingTasksEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
                 final Set<ReportingTaskDTO> nodeReportingTasks = nodeResponseEntity.getReportingTasks();
 
                 for (final ReportingTaskDTO nodeReportingTask : nodeReportingTasks) {
@@ -3428,7 +3446,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     }
 
     private boolean canChangeNodeState(final String method, final URI uri) {
-        return (HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method));
+        return HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method);
     }
 
     private void notifyDataFlowManagementServiceOfNodeStatusChange() {
@@ -3477,7 +3495,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             public void run() {
                 logger.info("Entering safe mode...");
                 final int safeModeSeconds = (int) FormatUtils.getTimeDuration(properties.getClusterManagerSafeModeDuration(), TimeUnit.SECONDS);
-                final long timeToElect = (safeModeSeconds <= 0) ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
+                final long timeToElect = safeModeSeconds <= 0 ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
                 boolean exitSafeMode = false;
                 while (isRunning()) {
 
@@ -3819,7 +3837,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
     public static Date normalizeStatusSnapshotDate(final Date toNormalize, final long numMillis) {
         final long time = toNormalize.getTime();
-        return new Date(time - (time % numMillis));
+        return new Date(time - time % numMillis);
     }
 
     private NodeDTO createNodeDTO(final Node node) {
@@ -3861,8 +3879,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
         StatusHistoryDTO lastStatusHistory = null;
-        Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>();
-        Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+        final Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>();
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
 
         for (final Node node : getRawNodes()) {
             final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
@@ -3942,8 +3960,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
         StatusHistoryDTO lastStatusHistory = null;
-        Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>();
-        Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+        final Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>();
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
 
         for (final Node node : getRawNodes()) {
             final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
@@ -4006,8 +4024,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
         StatusHistoryDTO lastStatusHistory = null;
-        Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>();
-        Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+        final Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>();
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
 
         for (final Node node : getRawNodes()) {
             final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
@@ -4070,8 +4088,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
         StatusHistoryDTO lastStatusHistory = null;
-        Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>();
-        Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+        final Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>();
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
 
         for (final Node node : getRawNodes()) {
             final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
index d1d5e5b..4795827 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
@@ -17,24 +17,43 @@
 package org.apache.nifi.events;
 
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.ComponentType;
 
-/**
- *
- */
 public final class BulletinFactory {
 
     private static final AtomicLong currentId = new AtomicLong(0);
 
     public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message) {
-        return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), connectable.getName(), category, severity, message);
+        final ComponentType type;
+        switch (connectable.getConnectableType()) {
+            case REMOTE_INPUT_PORT:
+            case REMOTE_OUTPUT_PORT:
+                type = ComponentType.REMOTE_PROCESS_GROUP;
+                break;
+            case INPUT_PORT:
+                type = ComponentType.INPUT_PORT;
+                break;
+            case OUTPUT_PORT:
+                type = ComponentType.OUTPUT_PORT;
+                break;
+            case PROCESSOR:
+            default:
+                type = ComponentType.PROCESSOR;
+                break;
+        }
+
+        return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), type, connectable.getName(), category, severity, message);
     }
 
-    public static Bulletin createBulletin(final String groupId, final String sourceId, final String sourceName, final String category, final String severity, final String message) {
+    public static Bulletin createBulletin(final String groupId, final String sourceId, final ComponentType sourceType, final String sourceName,
+        final String category, final String severity, final String message) {
         final Bulletin bulletin = new ComponentBulletin(currentId.getAndIncrement());
         bulletin.setGroupId(groupId);
         bulletin.setSourceId(sourceId);
+        bulletin.setSourceType(sourceType);
         bulletin.setSourceName(sourceName);
         bulletin.setCategory(category);
         bulletin.setLevel(severity);
@@ -47,6 +66,7 @@ public final class BulletinFactory {
         bulletin.setCategory(category);
         bulletin.setLevel(severity);
         bulletin.setMessage(message);
+        bulletin.setSourceType(ComponentType.FLOW_CONTROLLER);
         return bulletin;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
index f97dc46..3359e7e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.events;
 
 import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.ComponentType;
 
 /**
  *
@@ -25,6 +26,7 @@ public class SystemBulletin extends Bulletin {
 
     SystemBulletin(final long id) {
         super(id);
+        setSourceType(ComponentType.FLOW_CONTROLLER);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
index 76ca661..d7fa3fc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
@@ -41,8 +41,8 @@ public class LogRepositoryFactory {
         logRepositoryClass = clazz;
     }
 
-    public static LogRepository getRepository(final String processorId) {
-        LogRepository repository = repositoryMap.get(requireNonNull(processorId));
+    public static LogRepository getRepository(final String componentId) {
+        LogRepository repository = repositoryMap.get(requireNonNull(componentId));
         if (repository == null) {
             try {
                 repository = logRepositoryClass.newInstance();
@@ -50,7 +50,7 @@ public class LogRepositoryFactory {
                 throw new RuntimeException(e);
             }
 
-            final LogRepository oldRepository = repositoryMap.putIfAbsent(processorId, repository);
+            final LogRepository oldRepository = repositoryMap.putIfAbsent(componentId, repository);
             if (oldRepository != null) {
                 repository = oldRepository;
             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2ffdd4e..b6edbbb 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -137,11 +137,13 @@ import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.groups.StandardProcessGroup;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.ControllerServiceLogObserver;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.logging.ProcessorLogObserver;
+import org.apache.nifi.logging.ReportingTaskLogObserver;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
@@ -593,7 +595,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         writeLock.lock();
         try {
             if (startDelayedComponents) {
-                LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()));
+                LOG.info("Starting {} processors/ports/funnels", startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size());
                 for (final Connectable connectable : startConnectablesAfterInitialization) {
                     if (connectable.getScheduledState() == ScheduledState.DISABLED) {
                         continue;
@@ -1012,7 +1014,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public boolean isTerminated() {
         this.readLock.lock();
         try {
-            return (null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated());
+            return null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated();
         } finally {
             this.readLock.unlock();
         }
@@ -1828,9 +1830,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         } else if (id1.equals(id2)) {
             return true;
         } else {
-            final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1);
-            final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2);
-            return (comparable1.equals(comparable2));
+            final String comparable1 = id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1;
+            final String comparable2 = id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2;
+            return comparable1.equals(comparable2);
         }
     }
 
@@ -1964,7 +1966,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id;
-        return (root == null) ? null : root.findProcessGroup(searchId);
+        return root == null ? null : root.findProcessGroup(searchId);
     }
 
     @Override
@@ -2079,8 +2081,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut());
                 connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut());
 
-                flowFilesTransferred += (connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut());
-                bytesTransferred += (connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut());
+                flowFilesTransferred += connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut();
+                bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut();
             }
 
             if (StringUtils.isNotBlank(conn.getName())) {
@@ -2552,6 +2554,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         reportingTasks.put(id, taskNode);
+
+        // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+            new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
+
         return taskNode;
     }
 
@@ -2616,7 +2624,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
     @Override
     public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
-        return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+        final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+
+        // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+            new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
+
+        return serviceNode;
     }
 
     @Override
@@ -3480,7 +3495,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     if (bulletin.getGroupId() == null) {
                         escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
                     } else {
-                        escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(),
+                        escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(),
                                 bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
                     }
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 92fa3b2..b5c3855 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -48,11 +48,12 @@ public class ControllerServiceLoader {
     private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class);
 
     public static List<ControllerServiceNode> loadControllerServices(
-            final ControllerServiceProvider provider,
-            final InputStream serializedStream,
-            final StringEncryptor encryptor,
-            final BulletinRepository bulletinRepo,
-            final boolean autoResumeState) throws IOException {
+        final ControllerServiceProvider provider,
+        final InputStream serializedStream,
+        final StringEncryptor encryptor,
+        final BulletinRepository bulletinRepo,
+        final boolean autoResumeState) throws IOException {
+
         final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
         documentBuilderFactory.setNamespaceAware(true);
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index a20e974..a58bf8e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.util.RingBuffer;
 import org.apache.nifi.util.RingBuffer.Filter;
 
@@ -167,7 +168,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
         }
 
         final RingBuffer<Bulletin> buffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
-        return (buffer == null) ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() {
+        return buffer == null ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() {
             @Override
             public boolean select(final Bulletin bulletin) {
                 return bulletin.getTimestamp().getTime() >= fiveMinutesAgo;
@@ -199,7 +200,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
         ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(groupId);
         if (componentMap == null) {
             componentMap = new ConcurrentHashMap<>();
-            ConcurrentMap<String, RingBuffer<Bulletin>> existing = bulletinStoreMap.putIfAbsent(groupId, componentMap);
+            final ConcurrentMap<String, RingBuffer<Bulletin>> existing = bulletinStoreMap.putIfAbsent(groupId, componentMap);
             if (existing != null) {
                 componentMap = existing;
             }
@@ -225,7 +226,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
     }
 
     private boolean isControllerBulletin(final Bulletin bulletin) {
-        return bulletin.getGroupId() == null;
+        return ComponentType.FLOW_CONTROLLER.equals(bulletin.getSourceType());
     }
 
     private class DefaultBulletinProcessingStrategy implements BulletinProcessingStrategy {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
index 6f1dc45..17c5991 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
@@ -18,6 +18,8 @@ package org.apache.nifi.jaxb;
 
 import java.util.Date;
 
+import org.apache.nifi.reporting.ComponentType;
+
 /**
  *
  */
@@ -32,6 +34,7 @@ public class AdaptedBulletin {
     private String groupId;
     private String sourceId;
     private String sourceName;
+    private ComponentType sourceType;
 
     public String getCategory() {
         return category;
@@ -97,4 +100,11 @@ public class AdaptedBulletin {
         this.timestamp = timestamp;
     }
 
+    public ComponentType getSourceType() {
+        return sourceType;
+    }
+
+    public void setSourceType(ComponentType sourceType) {
+        this.sourceType = sourceType;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
index b699348..acbe0dd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
@@ -34,7 +34,7 @@ public class BulletinAdapter extends XmlAdapter<AdaptedBulletin, Bulletin> {
         if (b.getSourceId() == null) {
             return BulletinFactory.createBulletin(b.getCategory(), b.getLevel(), b.getMessage());
         } else {
-            return BulletinFactory.createBulletin(b.getGroupId(), b.getSourceId(), b.getSourceName(), b.getCategory(), b.getLevel(), b.getMessage());
+            return BulletinFactory.createBulletin(b.getGroupId(), b.getSourceId(), b.getSourceType(), b.getSourceName(), b.getCategory(), b.getLevel(), b.getMessage());
         }
     }
 
@@ -48,6 +48,7 @@ public class BulletinAdapter extends XmlAdapter<AdaptedBulletin, Bulletin> {
         aBulletin.setTimestamp(b.getTimestamp());
         aBulletin.setGroupId(b.getGroupId());
         aBulletin.setSourceId(b.getSourceId());
+        aBulletin.setSourceType(b.getSourceType());
         aBulletin.setSourceName(b.getSourceName());
         aBulletin.setCategory(b.getCategory());
         aBulletin.setLevel(b.getLevel());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
new file mode 100644
index 0000000..837e1c4
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.reporting.Severity;
+
+public class ControllerServiceLogObserver implements LogObserver {
+    private final BulletinRepository bulletinRepository;
+    private final ControllerServiceNode serviceNode;
+
+    public ControllerServiceLogObserver(final BulletinRepository bulletinRepository, final ControllerServiceNode serviceNode) {
+        this.bulletinRepository = bulletinRepository;
+        this.serviceNode = serviceNode;
+    }
+
+    @Override
+    public void onLogMessage(final LogMessage message) {
+        // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
+        // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
+        final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString();
+
+        final Bulletin bulletin = BulletinFactory.createBulletin(null, serviceNode.getIdentifier(), ComponentType.REPORTING_TASK,
+            serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage());
+        bulletinRepository.addBulletin(bulletin);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
new file mode 100644
index 0000000..e5638d6
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.reporting.Severity;
+
+public class ReportingTaskLogObserver implements LogObserver {
+    private final BulletinRepository bulletinRepository;
+    private final ReportingTaskNode taskNode;
+
+    public ReportingTaskLogObserver(final BulletinRepository bulletinRepository, final ReportingTaskNode taskNode) {
+        this.bulletinRepository = bulletinRepository;
+        this.taskNode = taskNode;
+    }
+
+    @Override
+    public void onLogMessage(final LogMessage message) {
+        // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
+        // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
+        final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString();
+
+        final Bulletin bulletin = BulletinFactory.createBulletin(null, taskNode.getIdentifier(), ComponentType.REPORTING_TASK,
+            taskNode.getName(), "Log Message", bulletinLevel, message.getMessage());
+        bulletinRepository.addBulletin(bulletin);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index d19b5c1..61516d0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -57,6 +57,7 @@ import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
@@ -164,7 +165,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
                 final String groupId = StandardRemoteProcessGroup.this.getProcessGroup().getIdentifier();
                 final String sourceId = StandardRemoteProcessGroup.this.getIdentifier();
                 final String sourceName = StandardRemoteProcessGroup.this.getName();
-                bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message));
+                bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, ComponentType.REMOTE_PROCESS_GROUP,
+                    sourceName, category, severity.name(), message));
             }
         };
 
@@ -227,7 +229,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     @Override
     public String getName() {
         final String name = this.name.get();
-        return (name == null) ? targetUri.toString() : name;
+        return name == null ? targetUri.toString() : name;
     }
 
     @Override
@@ -671,7 +673,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
 
     private ProcessGroup getRootGroup(final ProcessGroup context) {
         final ProcessGroup parent = context.getParent();
-        return (parent == null) ? context : getRootGroup(parent);
+        return parent == null ? context : getRootGroup(parent);
     }
 
     private boolean isWebApiSecure() {
@@ -714,7 +716,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     public Date getLastRefreshTime() {
         readLock.lock();
         try {
-            return (refreshContentsTimestamp == null) ? null : new Date(refreshContentsTimestamp);
+            return refreshContentsTimestamp == null ? null : new Date(refreshContentsTimestamp);
         } finally {
             readLock.unlock();
         }
@@ -855,7 +857,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
         Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
         if (ports != null) {
             remotePorts = new LinkedHashSet<>(ports.size());
-            for (PortDTO port : ports) {
+            for (final PortDTO port : ports) {
                 final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
                 final ScheduledState scheduledState = ScheduledState.valueOf(port.getState());
                 descriptor.setId(port.getId());
@@ -1093,7 +1095,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
                     }
 
                     final String remoteInstanceId = dto.getInstanceId();
-                    boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId);
+                    final boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId);
                     pointsToCluster.set(isPointingToCluster);
                 } else if (statusCode == UNAUTHORIZED_STATUS_CODE) {
                     try {
@@ -1120,7 +1122,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
                             new Object[]{this, response.getStatus(), response.getStatusInfo().getReasonPhrase(), message});
                     authorizationIssue = "Unable to determine Site-to-Site availability.";
                 }
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e));
                 getEventReporter().reportEvent(Severity.WARNING, "Site to Site", String.format("Unable to connect to %s due to %s",
                         StandardRemoteProcessGroup.this.getTargetUri().toString(), e));

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 4bb1683..9eadec0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -61,6 +61,7 @@ import org.apache.nifi.remote.exception.TransmissionDisabledException;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.ServerProtocol;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.user.NiFiUser;
@@ -108,7 +109,8 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
                 final String groupId = StandardRootGroupPort.this.getProcessGroup().getIdentifier();
                 final String sourceId = StandardRootGroupPort.this.getIdentifier();
                 final String sourceName = StandardRootGroupPort.this.getName();
-                bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message));
+                final ComponentType componentType = direction == TransferDirection.RECEIVE ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT;
+                bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, componentType, sourceName, category, severity.name(), message));
             }
         };
 


[06/10] incubator-nifi git commit: NIFI-724: Ensure that bulletins generated for reporting tasks and controller services are shown at Controller level as well as component level

Posted by mc...@apache.org.
NIFI-724: Ensure that bulletins generated for reporting tasks and controller services are shown at Controller level as well as component level


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e7c0461b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e7c0461b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e7c0461b

Branch: refs/heads/develop
Commit: e7c0461b15bff045d68e7ae8814eda2073cba209
Parents: 59aa8ff
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jul 1 12:54:18 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jul 1 12:54:18 2015 -0400

----------------------------------------------------------------------
 .../nifi/events/VolatileBulletinRepository.java | 105 ++++++++++++++-----
 1 file changed, 78 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e7c0461b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index c18fffd..8aeb34d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -36,6 +36,8 @@ public class VolatileBulletinRepository implements BulletinRepository {
     private static final int CONTROLLER_BUFFER_SIZE = 10;
     private static final int COMPONENT_BUFFER_SIZE = 5;
     private static final String CONTROLLER_BULLETIN_STORE_KEY = "CONTROLLER";
+    private static final String SERVICE_BULLETIN_STORE_KEY = "SERVICE";
+    private static final String REPORTING_TASK_BULLETIN_STORE_KEY = "REPORTING_TASK";
 
     private final ConcurrentMap<String, ConcurrentMap<String, RingBuffer<Bulletin>>> bulletinStoreMap = new ConcurrentHashMap<>();
     private volatile BulletinProcessingStrategy processingStrategy = new DefaultBulletinProcessingStrategy();
@@ -170,18 +172,39 @@ public class VolatileBulletinRepository implements BulletinRepository {
     public List<Bulletin> findBulletinsForController(final int max) {
         final long fiveMinutesAgo = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5);
 
-        final ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(CONTROLLER_BULLETIN_STORE_KEY);
-        if (componentMap == null) {
-            return Collections.<Bulletin>emptyList();
-        }
-
-        final RingBuffer<Bulletin> buffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
-        return buffer == null ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() {
+        final Filter<Bulletin> filter = new Filter<Bulletin>() {
             @Override
             public boolean select(final Bulletin bulletin) {
                 return bulletin.getTimestamp().getTime() >= fiveMinutesAgo;
             }
-        }, max);
+        };
+
+        final List<Bulletin> controllerBulletins = new ArrayList<>();
+
+        final ConcurrentMap<String, RingBuffer<Bulletin>> controllerBulletinMap = bulletinStoreMap.get(CONTROLLER_BULLETIN_STORE_KEY);
+        if (controllerBulletinMap != null) {
+            final RingBuffer<Bulletin> buffer = controllerBulletinMap.get(CONTROLLER_BULLETIN_STORE_KEY);
+            if (buffer != null) {
+                controllerBulletins.addAll(buffer.getSelectedElements(filter, max));
+            }
+        }
+
+        for (final String key : new String[] { SERVICE_BULLETIN_STORE_KEY, REPORTING_TASK_BULLETIN_STORE_KEY }) {
+            final ConcurrentMap<String, RingBuffer<Bulletin>> bulletinMap = bulletinStoreMap.get(key);
+            if (bulletinMap != null) {
+                for (final RingBuffer<Bulletin> buffer : bulletinMap.values()) {
+                    controllerBulletins.addAll(buffer.getSelectedElements(filter, max));
+                }
+            }
+        }
+
+        // We only want the newest bulletin, so we sort based on time and take the top 'max' entries
+        Collections.sort(controllerBulletins);
+        if (controllerBulletins.size() > max) {
+            return controllerBulletins.subList(0, max);
+        }
+
+        return controllerBulletins;
     }
 
     /**
@@ -203,7 +226,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
         this.processingStrategy = new DefaultBulletinProcessingStrategy();
     }
 
-    private RingBuffer<Bulletin> getBulletinBuffer(final Bulletin bulletin) {
+    private List<RingBuffer<Bulletin>> getBulletinBuffers(final Bulletin bulletin) {
         final String storageKey = getBulletinStoreKey(bulletin);
 
         ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(storageKey);
@@ -215,40 +238,68 @@ public class VolatileBulletinRepository implements BulletinRepository {
             }
         }
 
-        final boolean controllerBulletin = isControllerBulletin(bulletin);
-        final String sourceId = controllerBulletin ? CONTROLLER_BULLETIN_STORE_KEY : bulletin.getSourceId();
-        RingBuffer<Bulletin> bulletinBuffer = componentMap.get(sourceId);
-        if (bulletinBuffer == null) {
-            final int bufferSize = controllerBulletin ? CONTROLLER_BUFFER_SIZE : COMPONENT_BUFFER_SIZE;
-            bulletinBuffer = new RingBuffer<>(bufferSize);
-            final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(sourceId, bulletinBuffer);
-            if (existingBuffer != null) {
-                bulletinBuffer = existingBuffer;
+        final List<RingBuffer<Bulletin>> buffers = new ArrayList<>(2);
+
+        if (isControllerBulletin(bulletin)) {
+            RingBuffer<Bulletin> bulletinBuffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
+            if (bulletinBuffer == null) {
+                bulletinBuffer = new RingBuffer<>(CONTROLLER_BUFFER_SIZE);
+                final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(CONTROLLER_BULLETIN_STORE_KEY, bulletinBuffer);
+                if (existingBuffer != null) {
+                    bulletinBuffer = existingBuffer;
+                }
             }
+
+            buffers.add(bulletinBuffer);
         }
 
-        return bulletinBuffer;
+        if (bulletin.getSourceType() != ComponentType.FLOW_CONTROLLER) {
+            RingBuffer<Bulletin> bulletinBuffer = componentMap.get(bulletin.getSourceId());
+            if (bulletinBuffer == null) {
+                bulletinBuffer = new RingBuffer<>(COMPONENT_BUFFER_SIZE);
+                final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(bulletin.getSourceId(), bulletinBuffer);
+                if (existingBuffer != null) {
+                    bulletinBuffer = existingBuffer;
+                }
+            }
+
+            buffers.add(bulletinBuffer);
+        }
+
+        return buffers;
     }
 
     private String getBulletinStoreKey(final Bulletin bulletin) {
-        if (isControllerBulletin(bulletin)) {
-            return CONTROLLER_BULLETIN_STORE_KEY;
+        switch (bulletin.getSourceType()) {
+            case FLOW_CONTROLLER:
+                return CONTROLLER_BULLETIN_STORE_KEY;
+            case CONTROLLER_SERVICE:
+                return SERVICE_BULLETIN_STORE_KEY;
+            case REPORTING_TASK:
+                return REPORTING_TASK_BULLETIN_STORE_KEY;
+            default:
+                return bulletin.getGroupId();
         }
-
-        final String groupId = bulletin.getGroupId();
-        return groupId == null ? bulletin.getSourceId() : groupId;
     }
 
     private boolean isControllerBulletin(final Bulletin bulletin) {
-        return ComponentType.FLOW_CONTROLLER.equals(bulletin.getSourceType());
+        switch (bulletin.getSourceType()) {
+            case FLOW_CONTROLLER:
+            case CONTROLLER_SERVICE:
+            case REPORTING_TASK:
+                return true;
+            default:
+                return false;
+        }
     }
 
     private class DefaultBulletinProcessingStrategy implements BulletinProcessingStrategy {
 
         @Override
         public void update(final Bulletin bulletin) {
-            final RingBuffer<Bulletin> bulletinBuffer = getBulletinBuffer(bulletin);
-            bulletinBuffer.add(bulletin);
+            for (final RingBuffer<Bulletin> bulletinBuffer : getBulletinBuffers(bulletin)) {
+                bulletinBuffer.add(bulletin);
+            }
         }
     }
 }


[03/10] incubator-nifi git commit: NIFI-724: merged changes

Posted by mc...@apache.org.
NIFI-724: merged changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/0a7ab1a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/0a7ab1a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/0a7ab1a0

Branch: refs/heads/develop
Commit: 0a7ab1a06dd8b7164413bdcd0a85a4c457d3da71
Parents: e240e07 f1adb8b
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jun 25 10:58:01 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jun 25 10:58:01 2015 -0400

----------------------------------------------------------------------

----------------------------------------------------------------------



[08/10] incubator-nifi git commit: NIFI-694: - Ensuring bulletins are not lost when updating a reporting task.

Posted by mc...@apache.org.
NIFI-694:
- Ensuring bulletins are not lost when updating a reporting task.

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c3dff409
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c3dff409
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c3dff409

Branch: refs/heads/develop
Commit: c3dff409024d00208c68323ad1989bd2b93b7c0c
Parents: 4fd1e94
Author: Matt Gilman <ma...@gmail.com>
Authored: Thu Jul 2 10:22:42 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Jul 2 10:22:42 2015 -0400

----------------------------------------------------------------------
 .../src/main/webapp/js/nf/canvas/nf-reporting-task.js           | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c3dff409/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js
index 998213e..da78e82 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js
@@ -177,7 +177,10 @@ nf.ReportingTask = (function () {
         // get the table and update the row accordingly
         var reportingTaskGrid = $('#reporting-tasks-table').data('gridInstance');
         var reportingTaskData = reportingTaskGrid.getData();
-        reportingTaskData.updateItem(reportingTask.id, reportingTask);
+        var currentReportingTask = reportingTaskData.getItemById(reportingTask.id);
+        reportingTaskData.updateItem(reportingTask.id, $.extend({
+            bulletins: currentReportingTask.bulletins
+        }, reportingTask));
     };
     
     /**


[05/10] incubator-nifi git commit: NIFI-694: - Showing bulletins in the controller service and reporting task tables. - Fixed issue typo when emitting controller service bulletins.

Posted by mc...@apache.org.
NIFI-694:
- Showing bulletins in the controller service and reporting task tables.
- Fixed issue typo when emitting controller service bulletins.

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/59aa8ffe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/59aa8ffe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/59aa8ffe

Branch: refs/heads/develop
Commit: 59aa8ffe100645498fdd4cea2214a203e3a3d433
Parents: 7a28903
Author: Matt Gilman <ma...@gmail.com>
Authored: Tue Jun 30 14:14:57 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Tue Jun 30 14:14:57 2015 -0400

----------------------------------------------------------------------
 .../apache/nifi/reporting/BulletinQuery.java    |  12 ++
 .../web/api/dto/status/ControllerStatusDTO.java |  33 +++-
 .../nifi/events/VolatileBulletinRepository.java |  13 +-
 .../logging/ControllerServiceLogObserver.java   |   5 +-
 .../nifi/web/StandardNiFiServiceFacade.java     |  19 +-
 .../org/apache/nifi/web/api/dto/DtoFactory.java |  22 ++-
 .../nifi/web/controller/ControllerFacade.java   |  43 ++--
 .../src/main/webapp/css/controller-service.css  |   9 +-
 .../src/main/webapp/js/nf/canvas/nf-canvas.js   |   3 +
 .../js/nf/canvas/nf-controller-service.js       |   9 +-
 .../src/main/webapp/js/nf/canvas/nf-settings.js | 194 +++++++++++++++++--
 11 files changed, 313 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java
index cb5d7b3..7ba2089 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java
@@ -23,6 +23,7 @@ import java.util.regex.Pattern;
  */
 public class BulletinQuery {
 
+    private final ComponentType sourceType;
     private final Pattern sourceIdPattern;
     private final Pattern groupIdPattern;
     private final Pattern namePattern;
@@ -31,6 +32,7 @@ public class BulletinQuery {
     private final Integer limit;
 
     private BulletinQuery(final Builder builder) {
+        this.sourceType = builder.sourceType;
         this.sourceIdPattern = builder.sourceIdPattern == null ? null : Pattern.compile(builder.sourceIdPattern);
         this.groupIdPattern = builder.groupIdPattern == null ? null : Pattern.compile(builder.groupIdPattern);
         this.namePattern = builder.namePattern == null ? null : Pattern.compile(builder.namePattern);
@@ -39,6 +41,10 @@ public class BulletinQuery {
         this.limit = builder.limit;
     }
 
+    public ComponentType getSourceType() {
+        return sourceType;
+    }
+
     public Pattern getSourceIdPattern() {
         return sourceIdPattern;
     }
@@ -65,6 +71,7 @@ public class BulletinQuery {
 
     public static class Builder {
 
+        private ComponentType sourceType;
         private String sourceIdPattern;
         private String groupIdPattern;
         private String namePattern;
@@ -87,6 +94,11 @@ public class BulletinQuery {
             return this;
         }
 
+        public Builder sourceType(ComponentType sourceType) {
+            this.sourceType = sourceType;
+            return this;
+        }
+
         public Builder nameMatches(String name) {
             this.namePattern = name;
             return this;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
index 7afc7bc..5d5eddf 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
@@ -40,6 +40,8 @@ public class ControllerStatusDTO {
     private Integer inactiveRemotePortCount;
 
     private List<BulletinDTO> bulletins;
+    private List<BulletinDTO> controllerServiceBulletins;
+    private List<BulletinDTO> reportingTaskBulletins;
 
     /**
      * The active thread count.
@@ -72,7 +74,8 @@ public class ControllerStatusDTO {
     }
 
     /**
-     * @return Used in clustering, will report the number of nodes connected vs the number of nodes in the cluster
+     * @return Used in clustering, will report the number of nodes connected vs
+     * the number of nodes in the cluster
      */
     @ApiModelProperty(
             value = "When clustered, reports the number of nodes connected vs the number of nodes in the cluster."
@@ -100,6 +103,34 @@ public class ControllerStatusDTO {
     }
 
     /**
+     * @return Controller service bulletins to be reported to the user
+     */
+    @ApiModelProperty(
+            value = "Controller service bulletins to be reported to the user."
+    )
+    public List<BulletinDTO> getControllerServiceBulletins() {
+        return controllerServiceBulletins;
+    }
+
+    public void setControllerServiceBulletins(List<BulletinDTO> controllerServiceBulletins) {
+        this.controllerServiceBulletins = controllerServiceBulletins;
+    }
+
+    /**
+     * @return Reporting task bulletins to be reported to the user
+     */
+    @ApiModelProperty(
+            value = "Reporting task bulletins to be reported to the user."
+    )
+    public List<BulletinDTO> getReportingTaskBulletins() {
+        return reportingTaskBulletins;
+    }
+
+    public void setReportingTaskBulletins(List<BulletinDTO> reportingTaskBulletins) {
+        this.reportingTaskBulletins = reportingTaskBulletins;
+    }
+
+    /**
      * @return whether or not there are pending user requests
      */
     @ApiModelProperty(

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index 5172d34..c18fffd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -102,6 +102,14 @@ public class VolatileBulletinRepository implements BulletinRepository {
                     }
                 }
 
+                // if a source component type was specified see if it should be excluded
+                if (bulletinQuery.getSourceType() != null) {
+                    // exclude if this bulletin source type doesn't match
+                    if (bulletin.getSourceType() == null || !bulletinQuery.getSourceType().equals(bulletin.getSourceType())) {
+                        return false;
+                    }
+                }
+
                 return true;
             }
         };
@@ -177,8 +185,9 @@ public class VolatileBulletinRepository implements BulletinRepository {
     }
 
     /**
-     * Overrides the default bulletin processing strategy. When a custom bulletin strategy is employed, bulletins will not be persisted in this repository and will sent to the specified strategy
-     * instead.
+     * Overrides the default bulletin processing strategy. When a custom
+     * bulletin strategy is employed, bulletins will not be persisted in this
+     * repository and will sent to the specified strategy instead.
      *
      * @param strategy bulletin strategy
      */

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
index 837e1c4..d9eaa12 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
@@ -24,6 +24,7 @@ import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.Severity;
 
 public class ControllerServiceLogObserver implements LogObserver {
+
     private final BulletinRepository bulletinRepository;
     private final ControllerServiceNode serviceNode;
 
@@ -38,8 +39,8 @@ public class ControllerServiceLogObserver implements LogObserver {
         // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
         final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString();
 
-        final Bulletin bulletin = BulletinFactory.createBulletin(null, serviceNode.getIdentifier(), ComponentType.REPORTING_TASK,
-            serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage());
+        final Bulletin bulletin = BulletinFactory.createBulletin(null, serviceNode.getIdentifier(), ComponentType.CONTROLLER_SERVICE,
+                serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage());
         bulletinRepository.addBulletin(bulletin);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 903b872..4e50ff3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -159,6 +159,7 @@ import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.controller.service.ControllerServiceReference;
 import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.web.api.dto.ControllerServiceDTO;
 import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
 import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
@@ -1578,12 +1579,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
 
             // get the bulletins for the controller
             final BulletinRepository bulletinRepository = clusterManager.getBulletinRepository();
-            final List<Bulletin> results = bulletinRepository.findBulletinsForController();
-            final List<BulletinDTO> bulletinDtos = new ArrayList<>(results.size());
-            for (final Bulletin bulletin : results) {
-                bulletinDtos.add(dtoFactory.createBulletinDto(bulletin));
-            }
-            controllerStatus.setBulletins(bulletinDtos);
+            controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()));
+
+            // get the controller service bulletins
+            final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
+            controllerStatus.setControllerServiceBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(controllerServiceQuery)));
+
+            // get the reporting task bulletins
+            final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
+            controllerStatus.setReportingTaskBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(reportingTaskQuery)));
 
             // get the component counts by extracting them from the roots' group status
             final ProcessGroupStatus status = clusterManager.getProcessGroupStatus("root");
@@ -3019,7 +3023,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
     }
 
     /**
-     * Utility method for extracting component counts from the specified group status.
+     * Utility method for extracting component counts from the specified group
+     * status.
      */
     private ProcessGroupCounts extractProcessGroupCounts(ProcessGroupStatus groupStatus) {
         int running = 0;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 54f11fb..76bce6f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1487,6 +1487,20 @@ public final class DtoFactory {
     }
 
     /**
+     * Creates BulletinDTOs for the specified Bulletins.
+     *
+     * @param bulletins bulletin
+     * @return dto
+     */
+    public List<BulletinDTO> createBulletinDtos(final List<Bulletin> bulletins) {
+        final List<BulletinDTO> bulletinDtos = new ArrayList<>(bulletins.size());
+        for (final Bulletin bulletin : bulletins) {
+            bulletinDtos.add(createBulletinDto(bulletin));
+        }
+        return bulletinDtos;
+    }
+
+    /**
      * Creates a BulletinDTO for the specified Bulletin.
      *
      * @param bulletin bulletin
@@ -1507,7 +1521,8 @@ public final class DtoFactory {
     }
 
     /**
-     * Creates a ProvenanceEventNodeDTO for the specified ProvenanceEventLineageNode.
+     * Creates a ProvenanceEventNodeDTO for the specified
+     * ProvenanceEventLineageNode.
      *
      * @param node node
      * @return dto
@@ -2158,8 +2173,9 @@ public final class DtoFactory {
     /**
      *
      * @param original orig
-     * @param deep if <code>true</code>, all Connections, ProcessGroups, Ports, Processors, etc. will be copied. If <code>false</code>, the copy will have links to the same objects referenced by
-     * <code>original</code>.
+     * @param deep if <code>true</code>, all Connections, ProcessGroups, Ports,
+     * Processors, etc. will be copied. If <code>false</code>, the copy will
+     * have links to the same objects referenced by <code>original</code>.
      *
      * @return dto
      */

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index b614f0a..8bf5553 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -75,7 +75,6 @@ import org.apache.nifi.provenance.search.SearchTerm;
 import org.apache.nifi.provenance.search.SearchTerms;
 import org.apache.nifi.provenance.search.SearchableField;
 import org.apache.nifi.remote.RootGroupPort;
-import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.ReportingTask;
 import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -88,7 +87,6 @@ import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
 import org.apache.nifi.web.NiFiCoreException;
 import org.apache.nifi.web.ResourceNotFoundException;
-import org.apache.nifi.web.api.dto.BulletinDTO;
 import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
 import org.apache.nifi.web.api.dto.DtoFactory;
 import org.apache.nifi.web.api.dto.provenance.AttributeDTO;
@@ -113,6 +111,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.admin.service.UserService;
 import org.apache.nifi.authorization.DownloadAuthorization;
 import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.web.security.user.NiFiUserUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -441,12 +441,15 @@ public class ControllerFacade {
         controllerStatus.setQueued(FormatUtils.formatCount(controllerQueueSize.getObjectCount()) + " / " + FormatUtils.formatDataSize(controllerQueueSize.getByteCount()));
 
         final BulletinRepository bulletinRepository = getBulletinRepository();
-        final List<Bulletin> results = bulletinRepository.findBulletinsForController();
-        final List<BulletinDTO> bulletinDtos = new ArrayList<>(results.size());
-        for (final Bulletin bulletin : results) {
-            bulletinDtos.add(dtoFactory.createBulletinDto(bulletin));
-        }
-        controllerStatus.setBulletins(bulletinDtos);
+        controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()));
+
+        // get the controller service bulletins
+        final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
+        controllerStatus.setControllerServiceBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(controllerServiceQuery)));
+
+        // get the reporting task bulletins
+        final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
+        controllerStatus.setReportingTaskBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(reportingTaskQuery)));
 
         final ProcessGroupCounts counts = rootGroup.getCounts();
         controllerStatus.setRunningCount(counts.getRunningCount());
@@ -494,36 +497,44 @@ public class ControllerFacade {
     }
 
     /**
-     * Returns the socket port that the Cluster Manager is listening on for Site-to-Site communications
+     * Returns the socket port that the Cluster Manager is listening on for
+     * Site-to-Site communications
      *
-     * @return the socket port that the Cluster Manager is listening on for Site-to-Site communications
+     * @return the socket port that the Cluster Manager is listening on for
+     * Site-to-Site communications
      */
     public Integer getClusterManagerRemoteSiteListeningPort() {
         return flowController.getClusterManagerRemoteSiteListeningPort();
     }
 
     /**
-     * Indicates whether or not Site-to-Site communications with the Cluster Manager are secure
+     * Indicates whether or not Site-to-Site communications with the Cluster
+     * Manager are secure
      *
-     * @return whether or not Site-to-Site communications with the Cluster Manager are secure
+     * @return whether or not Site-to-Site communications with the Cluster
+     * Manager are secure
      */
     public Boolean isClusterManagerRemoteSiteCommsSecure() {
         return flowController.isClusterManagerRemoteSiteCommsSecure();
     }
 
     /**
-     * Returns the socket port that the local instance is listening on for Site-to-Site communications
+     * Returns the socket port that the local instance is listening on for
+     * Site-to-Site communications
      *
-     * @return the socket port that the local instance is listening on for Site-to-Site communications
+     * @return the socket port that the local instance is listening on for
+     * Site-to-Site communications
      */
     public Integer getRemoteSiteListeningPort() {
         return flowController.getRemoteSiteListeningPort();
     }
 
     /**
-     * Indicates whether or not Site-to-Site communications with the local instance are secure
+     * Indicates whether or not Site-to-Site communications with the local
+     * instance are secure
      *
-     * @return whether or not Site-to-Site communications with the local instance are secure
+     * @return whether or not Site-to-Site communications with the local
+     * instance are secure
      */
     public Boolean isRemoteSiteCommsSecure() {
         return flowController.isRemoteSiteCommsSecure();

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
index 4a82810..9d87994 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
@@ -133,9 +133,12 @@ div.referencing-component-bulletins {
     display: none;
 }
 
-span.service.expansion-button {
+div.service.expansion-button {
+    float: left;
     margin-right: 4px;
     margin-top: 2px;
+    width: 10px;
+    height: 10px;
 }
 
 span.referencing-component-active-thread-count {
@@ -197,6 +200,8 @@ div.referencing-component-references {
 
 #disable-controller-service-name {
     float: left;
+    max-width: 280px;
+    text-overflow: ellipsis;
 }
 
 #disable-controller-service-bulletins {
@@ -246,6 +251,8 @@ div.referencing-component-references {
 
 #enable-controller-service-name {
     float: left;
+    max-width: 280px;
+    text-overflow: ellipsis;
 }
 
 #enable-controller-service-bulletins {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js
index ca45a3d..06ef89f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js
@@ -833,6 +833,9 @@ nf.Canvas = (function () {
                         bulletinIcon.show();
                     }
                 }
+                
+                // update controller service and reporting task bulletins
+                nf.Settings.setBulletins(controllerStatus.controllerServiceBulletins, controllerStatus.reportingTaskBulletins);
 
                 // handle any pending user request
                 if (controllerStatus.hasPendingAccounts === true) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
index baa9edc..efcf9fb 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
@@ -149,7 +149,10 @@ nf.ControllerService = (function () {
         // get the table and update the row accordingly
         var controllerServiceGrid = $('#controller-services-table').data('gridInstance');
         var controllerServiceData = controllerServiceGrid.getData();
-        controllerServiceData.updateItem(controllerService.id, controllerService);
+        var currentControllerService = controllerServiceData.getItemById(controllerService.id);
+        controllerServiceData.updateItem(controllerService.id, $.extend({
+            bulletins: currentControllerService.bulletins
+        }, controllerService));
     };
     
     /**
@@ -426,7 +429,7 @@ nf.ControllerService = (function () {
                 
                 // container for this service's references
                 var referencingServiceReferencesContainer = $('<div class="referencing-component-references hidden"></div>');
-                var serviceTwist = $('<span class="service expansion-button collapsed pointer"></span>').on('click', function() {
+                var serviceTwist = $('<div class="service expansion-button collapsed pointer"></div>').on('click', function() {
                     if (serviceTwist.hasClass('collapsed')) {
                         var controllerServiceGrid = $('#controller-services-table').data('gridInstance');
                         var controllerServiceData = controllerServiceGrid.getData();
@@ -511,7 +514,7 @@ nf.ControllerService = (function () {
                 return;
             }
             
-            var twist = $('<span class="expansion-button expanded"></span>');
+            var twist = $('<div class="expansion-button expanded"></div>');
             var title = $('<span class="referencing-component-title"></span>').text(titleText);
             var count = $('<span class="referencing-component-count"></span>').text('(' + list.children().length + ')');
             

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
index d323892..3715110 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-/* global nf, Slick */
+/* global nf, Slick, d3 */
 
 nf.Settings = (function () {
 
@@ -600,9 +600,21 @@ nf.Settings = (function () {
         // more details formatter
         var moreControllerServiceDetails = function (row, cell, value, columnDef, dataContext) {
             var markup = '<img src="images/iconDetails.png" title="View Details" class="pointer view-controller-service" style="margin-top: 5px; float: left;" />';
-            if (!nf.Common.isEmpty(dataContext.validationErrors)) {
-                markup += '<img src="images/iconAlert.png" class="has-errors" style="margin-top: 4px; margin-left: 4px; float: left;" /><span class="hidden row-id">' + nf.Common.escapeHtml(dataContext.id) + '</span>';
+            var hasErrors = !nf.Common.isEmpty(dataContext.validationErrors);
+            var hasBulletins = !nf.Common.isEmpty(dataContext.bulletins);
+            
+            if (hasErrors) {
+                markup += '<img src="images/iconAlert.png" class="has-errors" style="margin-top: 4px; margin-left: 1px; float: left;" />';
+            }
+            
+            if (hasBulletins) {
+                markup += '<img src="images/iconBulletin.png" class="has-bulletins" style="margin-top: 5px; margin-left: 5px; float: left;"/>';
+            }
+            
+            if (hasErrors || hasBulletins) {
+                markup += '<span class="hidden row-id">' + nf.Common.escapeHtml(dataContext.id) + '</span>';
             }
+            
             return markup;
         };
         
@@ -642,7 +654,12 @@ nf.Settings = (function () {
                     markup += '<img src="images/iconDisable.png" title="Disable" class="pointer disable-controller-service" style="margin-top: 2px;" />&nbsp;';
                 } else if (dataContext.state === 'DISABLED') {
                     markup += '<img src="images/iconEdit.png" title="Edit" class="pointer edit-controller-service" style="margin-top: 2px;" />&nbsp;';
-                    markup += '<img src="images/iconEnable.png" title="Enable" class="pointer enable-controller-service" style="margin-top: 2px;"/>&nbsp;';
+                    
+                    // if there are no validation errors allow enabling
+                    if (nf.Common.isEmpty(dataContext.validationErrors)) {
+                        markup += '<img src="images/iconEnable.png" title="Enable" class="pointer enable-controller-service" style="margin-top: 2px;"/>&nbsp;';
+                    }
+                    
                     markup += '<img src="images/iconDelete.png" title="Remove" class="pointer delete-controller-service" style="margin-top: 2px;" />&nbsp;';
                 }
             }
@@ -655,7 +672,7 @@ nf.Settings = (function () {
 
         // define the column model for the controller services table
         var controllerServicesColumns = [
-            {id: 'moreDetails', name: '&nbsp;', resizable: false, formatter: moreControllerServiceDetails, sortable: false, width: 50, maxWidth: 50},
+            {id: 'moreDetails', name: '&nbsp;', resizable: false, formatter: moreControllerServiceDetails, sortable: false, width: 65, maxWidth: 65},
             {id: 'name', field: 'name', name: 'Name', sortable: true, resizable: true},
             {id: 'type', field: 'type', name: 'Type', formatter: typeFormatter, sortable: true, resizable: true},
             {id: 'state', field: 'state', name: 'State', formatter: controllerServiceStateFormatter, sortable: true, resizeable: true}
@@ -767,6 +784,34 @@ nf.Settings = (function () {
                     }, nf.Common.config.tooltipConfig));
                 }
             }
+            
+            var bulletinIcon = $(this).find('img.has-bulletins');
+            if (bulletinIcon.length && !bulletinIcon.data('qtip')) {
+                var taskId = $(this).find('span.row-id').text();
+
+                // get the task item
+                var item = controllerServicesData.getItemById(taskId);
+                
+                // format the tooltip
+                var bulletins = nf.Common.getFormattedBulletins(item.bulletins);
+                var tooltip = nf.Common.formatUnorderedList(bulletins);
+
+                // show the tooltip
+                if (nf.Common.isDefinedAndNotNull(tooltip)) {
+                    bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
+                        content: tooltip,
+                        position: {
+                            target: 'mouse',
+                            viewport: $(window),
+                            adjust: {
+                                x: 8,
+                                y: 8,
+                                method: 'flipinvert flipinvert'
+                            }
+                        }
+                    }));
+                }
+            }
         });
     };
 
@@ -785,7 +830,9 @@ nf.Settings = (function () {
             var nodeServices = response.controllerServices;
             if (nf.Common.isDefinedAndNotNull(nodeServices)) {
                 $.each(nodeServices, function (_, nodeService) {
-                    services.push(nodeService);
+                    services.push($.extend({
+                        bulletins: []
+                    }, nodeService));
                 });
             }
         });
@@ -801,7 +848,9 @@ nf.Settings = (function () {
                     var ncmServices = response.controllerServices;
                     if (nf.Common.isDefinedAndNotNull(ncmServices)) {
                         $.each(ncmServices, function (_, ncmService) {
-                            services.push(ncmService);
+                            services.push($.extend({
+                                bulletins: []
+                            }, ncmService));
                         });
                     }
                     deferred.resolve();
@@ -817,6 +866,7 @@ nf.Settings = (function () {
         return $.when(nodeControllerServices, ncmControllerServices).done(function () {
             var controllerServicesElement = $('#controller-services-table');
             nf.Common.cleanUpTooltips(controllerServicesElement, 'img.has-errors');
+            nf.Common.cleanUpTooltips(controllerServicesElement, 'img.has-bulletins');
 
             var controllerServicesGrid = controllerServicesElement.data('gridInstance');
             var controllerServicesData = controllerServicesGrid.getData();
@@ -1198,9 +1248,21 @@ nf.Settings = (function () {
 
         var moreReportingTaskDetails = function (row, cell, value, columnDef, dataContext) {
             var markup = '<img src="images/iconDetails.png" title="View Details" class="pointer view-reporting-task" style="margin-top: 5px; float: left;" />';
-            if (!nf.Common.isEmpty(dataContext.validationErrors)) {
-                markup += '<img src="images/iconAlert.png" class="has-errors" style="margin-top: 4px; margin-left: 4px; float: left" /><span class="hidden row-id">' + nf.Common.escapeHtml(dataContext.id) + '</span>';
+            var hasErrors = !nf.Common.isEmpty(dataContext.validationErrors);
+            var hasBulletins = !nf.Common.isEmpty(dataContext.bulletins);
+            
+            if (hasErrors) {
+                markup += '<img src="images/iconAlert.png" class="has-errors" style="margin-top: 4px; margin-left: 1px; float: left;" />';
             }
+            
+            if (hasBulletins) {
+                markup += '<img src="images/iconBulletin.png" class="has-bulletins" style="margin-top: 5px; margin-left: 5px; float: left;"/>';
+            }
+            
+            if (hasErrors || hasBulletins) {
+                markup += '<span class="hidden row-id">' + nf.Common.escapeHtml(dataContext.id) + '</span>';
+            }
+            
             return markup;
         };
         
@@ -1239,7 +1301,12 @@ nf.Settings = (function () {
                     markup += '<img src="images/iconStop.png" title="Stop" class="pointer stop-reporting-task" style="margin-top: 2px;" />&nbsp;';
                 } else if (dataContext.state === 'STOPPED' || dataContext.state === 'DISABLED') {
                     markup += '<img src="images/iconEdit.png" title="Edit" class="pointer edit-reporting-task" style="margin-top: 2px;" />&nbsp;';
-                    markup += '<img src="images/iconRun.png" title="Start" class="pointer start-reporting-task" style="margin-top: 2px;"/>&nbsp;';
+                 
+                    // support starting when stopped and no validation errors
+                    if (dataContext.state === 'STOPPED' && nf.Common.isEmpty(dataContext.validationErrors)) {
+                        markup += '<img src="images/iconRun.png" title="Start" class="pointer start-reporting-task" style="margin-top: 2px;"/>&nbsp;';
+                    }
+                    
                     markup += '<img src="images/iconDelete.png" title="Remove" class="pointer delete-reporting-task" style="margin-top: 2px;" />&nbsp;';
                 }
             }
@@ -1252,7 +1319,7 @@ nf.Settings = (function () {
 
         // define the column model for the reporting tasks table
         var reportingTasksColumnModel = [
-            {id: 'moreDetails', field: 'moreDetails', name: '&nbsp;', resizable: false, formatter: moreReportingTaskDetails, sortable: true, width: 50, maxWidth: 50},
+            {id: 'moreDetails', field: 'moreDetails', name: '&nbsp;', resizable: false, formatter: moreReportingTaskDetails, sortable: true, width: 65, maxWidth: 65},
             {id: 'name', field: 'name', name: 'Name', sortable: true, resizable: true},
             {id: 'type', field: 'type', name: 'Type', sortable: true, resizable: true, formatter: typeFormatter},
             {id: 'state', field: 'state', name: 'Run Status', sortable: true, resizeable: true, formatter: reportingTaskRunStatusFormatter}
@@ -1342,7 +1409,7 @@ nf.Settings = (function () {
             if (errorIcon.length && !errorIcon.data('qtip')) {
                 var taskId = $(this).find('span.row-id').text();
 
-                // get the service item
+                // get the task item
                 var item = reportingTasksData.getItemById(taskId);
 
                 // format the errors
@@ -1364,6 +1431,34 @@ nf.Settings = (function () {
                     }, nf.Common.config.tooltipConfig));
                 }
             }
+            
+            var bulletinIcon = $(this).find('img.has-bulletins');
+            if (bulletinIcon.length && !bulletinIcon.data('qtip')) {
+                var taskId = $(this).find('span.row-id').text();
+
+                // get the task item
+                var item = reportingTasksData.getItemById(taskId);
+                
+                // format the tooltip
+                var bulletins = nf.Common.getFormattedBulletins(item.bulletins);
+                var tooltip = nf.Common.formatUnorderedList(bulletins);
+
+                // show the tooltip
+                if (nf.Common.isDefinedAndNotNull(tooltip)) {
+                    bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
+                        content: tooltip,
+                        position: {
+                            target: 'mouse',
+                            viewport: $(window),
+                            adjust: {
+                                x: 8,
+                                y: 8,
+                                method: 'flipinvert flipinvert'
+                            }
+                        }
+                    }));
+                }
+            }
         });
     };
 
@@ -1382,7 +1477,9 @@ nf.Settings = (function () {
             var nodeTasks = response.reportingTasks;
             if (nf.Common.isDefinedAndNotNull(nodeTasks)) {
                 $.each(nodeTasks, function (_, nodeTask) {
-                    tasks.push(nodeTask);
+                    tasks.push($.extend({
+                        bulletins: []
+                    }, nodeTask));
                 });
             }
         });
@@ -1414,6 +1511,7 @@ nf.Settings = (function () {
         return $.when(nodeReportingTasks, ncmReportingTasks).done(function () {
             var reportingTasksElement = $('#reporting-tasks-table');
             nf.Common.cleanUpTooltips(reportingTasksElement, 'img.has-errors');
+            nf.Common.cleanUpTooltips(reportingTasksElement, 'img.has-bulletins');
 
             var reportingTasksGrid = reportingTasksElement.data('gridInstance');
             var reportingTasksData = reportingTasksGrid.getData();
@@ -1580,7 +1678,75 @@ nf.Settings = (function () {
             var reportingTasks = loadReportingTasks();
 
             // return a deferred for all parts of the settings
-            return $.when(settings, controllerServices, reportingTasks).fail(nf.Common.handleAjaxError);
+            return $.when(settings, controllerServices, reportingTasks).done(nf.Canvas.reloadStatus).fail(nf.Common.handleAjaxError);
+        },
+        
+        /**
+         * Sets the controller service and reporting task bulletins in their respective tables.
+         * 
+         * @param {object} controllerServiceBulletins
+         * @param {object} reportingTaskBulletins
+         */
+        setBulletins: function(controllerServiceBulletins, reportingTaskBulletins) {
+            // controller services
+            var controllerServicesGrid = $('#controller-services-table').data('gridInstance');
+            var controllerServicesData = controllerServicesGrid.getData();
+            controllerServicesData.beginUpdate();
+
+            // if there are some bulletins process them
+            if (!nf.Common.isEmpty(controllerServiceBulletins)) {
+                var controllerServiceBulletinsBySource = d3.nest()
+                    .key(function(d) { return d.sourceId; })
+                    .map(controllerServiceBulletins, d3.map);
+            
+                controllerServiceBulletinsBySource.forEach(function(sourceId, sourceBulletins) {
+                    var controllerService = controllerServicesData.getItemById(sourceId);
+                    if (nf.Common.isDefinedAndNotNull(controllerService)) {
+                        controllerServicesData.updateItem(sourceId, $.extend(controllerService, {
+                            bulletins: sourceBulletins
+                        }));
+                    }
+                });
+            } else {
+                // if there are no bulletins clear all
+                var controllerServices = controllerServicesData.getItems();
+                $.each(controllerServices, function(_, controllerService) {
+                    controllerServicesData.updateItem(controllerService.id, $.extend(controllerService, {
+                        bulletins: []
+                    }));
+                });
+            }
+            controllerServicesData.endUpdate();
+
+            // reporting tasks
+            var reportingTasksGrid = $('#reporting-tasks-table').data('gridInstance');
+            var reportingTasksData = reportingTasksGrid.getData();
+            reportingTasksData.beginUpdate();
+            
+            // if there are some bulletins process them
+            if (!nf.Common.isEmpty(reportingTaskBulletins)) {
+                var reportingTaskBulletinsBySource = d3.nest()
+                    .key(function(d) { return d.sourceId; })
+                    .map(reportingTaskBulletins, d3.map);
+
+                reportingTaskBulletinsBySource.forEach(function(sourceId, sourceBulletins) {
+                    var reportingTask = reportingTasksData.getItemById(sourceId);
+                    if (nf.Common.isDefinedAndNotNull(reportingTask)) {
+                        reportingTasksData.updateItem(sourceId, $.extend(reportingTask, {
+                            bulletins: sourceBulletins
+                        }));
+                    }
+                });
+            } else {
+                // if there are no bulletins clear all
+                var reportingTasks = reportingTasksData.getItems();
+                $.each(reportingTasks, function(_, reportingTask) {
+                    controllerServicesData.updateItem(reportingTask.id, $.extend(reportingTask, {
+                        bulletins: []
+                    }));
+                });
+            }
+            reportingTasksData.endUpdate();
         }
     };
 }());
\ No newline at end of file



[09/10] incubator-nifi git commit: NIFI-695: - Providing immediate feedback when canceling a request to [en|dis]able a controller service. - Reporting to the user after a request has been canceled.

Posted by mc...@apache.org.
NIFI-695:
- Providing immediate feedback when canceling a request to [en|dis]able a controller service.
- Reporting to the user after a request has been canceled.

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/566c4fe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/566c4fe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/566c4fe8

Branch: refs/heads/develop
Commit: 566c4fe8d78b769fc31c8d5bed90daccbc817e27
Parents: c3dff40
Author: Matt Gilman <ma...@gmail.com>
Authored: Thu Jul 2 13:45:14 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Jul 2 13:45:14 2015 -0400

----------------------------------------------------------------------
 .../disable-controller-service-dialog.jsp       |  3 +++
 .../canvas/enable-controller-service-dialog.jsp |  3 +++
 .../src/main/webapp/css/controller-service.css  |  6 +++++
 .../js/nf/canvas/nf-controller-service.js       | 24 ++++++++++++++++++++
 4 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/566c4fe8/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
index 1ecb730..0c77a10 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
@@ -70,4 +70,7 @@
             </div>
         </div>
     </div>
+    <div class="controller-service-canceling hidden unset">
+        Canceling...
+    </div>
 </div>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/566c4fe8/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
index 6613a04..0121e21 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
@@ -69,4 +69,7 @@
             </div>
         </div>
     </div>
+    <div class="controller-service-canceling hidden unset">
+        Canceling...
+    </div>
 </div>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/566c4fe8/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
index 9d87994..b0d980b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
@@ -161,6 +161,12 @@ div.referencing-component-references {
     clear: left;
 }
 
+div.controller-service-canceling {
+    position: absolute;
+    bottom: 10px;
+    right: 8px;
+}
+
 /*
     Comments
 */

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/566c4fe8/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
index 078064a..75c7bee 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
@@ -1032,6 +1032,8 @@ nf.ControllerService = (function () {
             handler: {
                 click: function () {
                     canceled = true;
+                    disableDialog.modal('setButtonModel', []);
+                    $('#disable-controller-service-dialog div.controller-service-canceling').show();
                 }
             }
         }]);
@@ -1053,6 +1055,7 @@ nf.ControllerService = (function () {
         
         // sets the close button on the dialog
         var setCloseButton = function () {
+            $('#disable-controller-service-dialog div.controller-service-canceling').hide();
             disableDialog.modal('setButtonModel', [{
                 buttonText: 'Close',
                 handler: {
@@ -1100,6 +1103,15 @@ nf.ControllerService = (function () {
         }).always(function () {
             reloadControllerServiceAndReferencingComponents(controllerService);
             setCloseButton();
+            
+            // inform the user if the action was canceled
+            if (canceled === true && $('#nf-ok-dialog').not(':visible')) {
+                nf.Dialog.showOkDialog({
+                    overlayBackground: false,
+                    headerText: 'Action Canceled',
+                    dialogContent: 'The request to disable has been canceled. Parts of this request may have already completed. Please verify the state of this service and all referencing components.'
+                });
+            }
         });
     };
     
@@ -1116,6 +1128,8 @@ nf.ControllerService = (function () {
             handler: {
                 click: function () {
                     canceled = true;
+                    enableDialog.modal('setButtonModel', []);
+                    $('#enable-controller-service-dialog div.controller-service-canceling').show();
                 }
             }
         }]);
@@ -1143,6 +1157,7 @@ nf.ControllerService = (function () {
 
         // sets the button to close
         var setCloseButton = function () {
+            $('#enable-controller-service-dialog div.controller-service-canceling').hide();
             enableDialog.modal('setButtonModel', [{
                 buttonText: 'Close',
                 handler: {
@@ -1200,6 +1215,15 @@ nf.ControllerService = (function () {
         }).always(function () {
             reloadControllerServiceAndReferencingComponents(controllerService);
             setCloseButton();
+            
+            // inform the user if the action was canceled
+            if (canceled === true && $('#nf-ok-dialog').not(':visible')) {
+                nf.Dialog.showOkDialog({
+                    overlayBackground: false,
+                    headerText: 'Action Canceled',
+                    dialogContent: 'The request to enable has been canceled. Parts of this request may have already completed. Please verify the state of this service and all referencing components.'
+                });
+            }
         });
     };
     


[04/10] incubator-nifi git commit: NIFI-694: - Showing bulletins for services and referencing components when enabling and disabling.

Posted by mc...@apache.org.
NIFI-694:
- Showing bulletins for services and referencing components when enabling and disabling.

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7a289034
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7a289034
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7a289034

Branch: refs/heads/develop
Commit: 7a28903402192819accaded9a90adfdfad852776
Parents: 0a7ab1a
Author: Matt Gilman <ma...@gmail.com>
Authored: Thu Jun 25 14:38:47 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Jun 25 14:38:47 2015 -0400

----------------------------------------------------------------------
 .../disable-controller-service-dialog.jsp       |   4 +-
 .../canvas/enable-controller-service-dialog.jsp |   4 +-
 .../src/main/webapp/css/controller-service.css  |  35 +++
 .../js/nf/canvas/nf-controller-service.js       | 222 ++++++++++++++++---
 4 files changed, 230 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7a289034/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
index dc76282..1ecb730 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
@@ -22,7 +22,9 @@
                 <div class="setting-name">Service</div>
                 <div class="setting-field">
                     <span id="disable-controller-service-id" class="hidden"></span>
-                    <span id="disable-controller-service-name"></span>
+                    <div id="disable-controller-service-name"></div>
+                    <div id="disable-controller-service-bulletins"></div>
+                    <div class="clear"></div>
                 </div>
             </div>
             <div id="disable-controller-service-scope-container" class="setting">

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7a289034/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
index 14fe658..6613a04 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
@@ -22,7 +22,9 @@
                 <div class="setting-name">Service</div>
                 <div class="setting-field">
                     <span id="enable-controller-service-id" class="hidden"></span>
-                    <span id="enable-controller-service-name"></span>
+                    <div id="enable-controller-service-name"></div>
+                    <div id="enable-controller-service-bulletins"></div>
+                    <div class="clear"></div>
                 </div>
             </div>
             <div id="enable-controller-service-scope-container" class="setting">

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7a289034/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
index 3d722cd..4a82810 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
@@ -124,6 +124,15 @@ div.referencing-component-state {
     margin-top: -2px;
 }
 
+div.referencing-component-bulletins {
+    float: left;
+    margin-left: 5px;
+    width: 12px;
+    height: 12px;
+    background-color: transparent;
+    display: none;
+}
+
 span.service.expansion-button {
     margin-right: 4px;
     margin-top: 2px;
@@ -186,6 +195,19 @@ div.referencing-component-references {
     margin-right: 40px;
 }
 
+#disable-controller-service-name {
+    float: left;
+}
+
+#disable-controller-service-bulletins {
+    float: left;
+    margin-left: 5px;
+    width: 12px;
+    height: 12px;
+    background-color: transparent;
+    display: none;
+}
+
 #disable-controller-service-referencing-components {
     border: 0 solid #CCCCCC;
     padding: 2px;
@@ -222,6 +244,19 @@ div.referencing-component-references {
     margin-right: 40px;
 }
 
+#enable-controller-service-name {
+    float: left;
+}
+
+#enable-controller-service-bulletins {
+    float: left;
+    margin-left: 5px;
+    width: 12px;
+    height: 12px;
+    background-color: transparent;
+    display: none;
+}
+
 #enable-controller-service-scope {
     float: left;
     width: 225px;

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7a289034/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
index e8a111d..baa9edc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
@@ -241,6 +241,40 @@ nf.ControllerService = (function () {
     };
     
     /**
+     * Updates the specified bulletinIcon with the specified bulletins if necessary.
+     * 
+     * @param {array} bulletins
+     * @param {jQuery} bulletinIcon
+     */
+    var updateBulletins = function (bulletins, bulletinIcon) {
+        var currentBulletins = bulletinIcon.data('bulletins');
+        
+        // update the bulletins if necessary
+        if (nf.Common.doBulletinsDiffer(currentBulletins, bulletins)) {
+            bulletinIcon.data('bulletins', bulletins);
+            
+            // format the new bulletins
+            var formattedBulletins = nf.Common.getFormattedBulletins(bulletins);
+            
+            // if there are bulletins update them
+            if (bulletins.length > 0) {
+                var list = nf.Common.formatUnorderedList(formattedBulletins);
+
+                // update existing tooltip or initialize a new one if appropriate
+                if (bulletinIcon.data('qtip')) {
+                    bulletinIcon.qtip('option', 'content.text', list);
+                } else {
+                    bulletinIcon.addClass('has-bulletins').show().qtip($.extend({
+                        content: list
+                    }, nf.CanvasUtils.config.systemTooltipConfig));
+                }
+            } else if (bulletinIcon.data('qtip')) {
+                bulletinIcon.removeClass('has-bulletins').removeData('bulletins').hide().qtip('api').destroy(true);
+            }
+        }
+    };
+    
+    /**
      * Updates the referencingComponentState using the specified referencingComponent.
      * 
      * @param {jQuery} referencingComponentState
@@ -301,6 +335,23 @@ nf.ControllerService = (function () {
     };
     
     /**
+     * Updates the bulletins for all referencing components.
+     * 
+     * @param {array} bulletins
+     */
+    var updateReferencingComponentBulletins = function(bulletins) {
+        var bulletinsBySource = d3.nest()
+            .key(function(d) { return d.sourceId; })
+            .map(bulletins, d3.map);
+
+        bulletinsBySource.forEach(function(sourceId, sourceBulletins) {
+            $('div.' + sourceId + '-bulletins').each(function() {
+                updateBulletins(sourceBulletins, $(this));
+            });
+        });
+    };
+    
+    /**
      * Adds the specified reference for this controller service.
      * 
      * @param {jQuery} referenceContainer 
@@ -323,10 +374,13 @@ nf.ControllerService = (function () {
             }
         };
         
+        var referencingComponentIds = [];
         var processors = $('<ul class="referencing-component-listing clear"></ul>');
         var services = $('<ul class="referencing-component-listing clear"></ul>');
         var tasks = $('<ul class="referencing-component-listing clear"></ul>');
         $.each(referencingComponents, function (_, referencingComponent) {
+            referencingComponentIds.push(referencingComponent.id);
+            
             if (referencingComponent.referenceType === 'Processor') {
                 var processorLink = $('<span class="referencing-component-name link"></span>').text(referencingComponent.name).on('click', function () {
                     // show the component
@@ -340,6 +394,9 @@ nf.ControllerService = (function () {
                 // state
                 var processorState = $('<div class="referencing-component-state"></div>').addClass(referencingComponent.id + '-state');
                 updateReferencingSchedulableComponentState(processorState, referencingComponent);
+
+                // bulletin
+                var processorBulletins = $('<div class="referencing-component-bulletins"></div>').addClass(referencingComponent.id + '-bulletins');
                 
                 // type
                 var processorType = $('<span class="referencing-component-type"></span>').text(nf.Common.substringAfterLast(referencingComponent.type, '.'));
@@ -351,7 +408,7 @@ nf.ControllerService = (function () {
                 }
                 
                 // processor
-                var processorItem = $('<li></li>').append(processorState).append(processorLink).append(processorType).append(processorActiveThreadCount);
+                var processorItem = $('<li></li>').append(processorState).append(processorBulletins).append(processorLink).append(processorType).append(processorActiveThreadCount);
                 processors.append(processorItem);
             } else if (referencingComponent.referenceType === 'ControllerService') {
                 var serviceLink = $('<span class="referencing-component-name link"></span>').text(referencingComponent.name).on('click', function () {
@@ -392,11 +449,14 @@ nf.ControllerService = (function () {
                 var serviceState = $('<div class="referencing-component-state"></div>').addClass(referencingComponent.id + '-state');
                 updateReferencingServiceState(serviceState, referencingComponent);
                 
+                // bulletin
+                var serviceBulletins = $('<div class="referencing-component-bulletins"></div>').addClass(referencingComponent.id + '-bulletins');
+                
                 // type
                 var serviceType = $('<span class="referencing-component-type"></span>').text(nf.Common.substringAfterLast(referencingComponent.type, '.'));
                 
                 // service
-                var serviceItem = $('<li></li>').append(serviceTwist).append(serviceState).append(serviceLink).append(serviceType).append(referencingServiceReferencesContainer);
+                var serviceItem = $('<li></li>').append(serviceTwist).append(serviceState).append(serviceBulletins).append(serviceLink).append(serviceType).append(referencingServiceReferencesContainer);
                 
                 services.append(serviceItem);
             } else if (referencingComponent.referenceType === 'ReportingTask') {
@@ -420,6 +480,9 @@ nf.ControllerService = (function () {
                 var reportingTaskState = $('<div class="referencing-component-state"></div>').addClass(referencingComponent.id + '-state');
                 updateReferencingSchedulableComponentState(reportingTaskState, referencingComponent);
                 
+                // bulletins
+                var reportingTaskBulletins = $('<div class="referencing-component-bulletins"></div>').addClass(referencingComponent.id + '-bulletins');
+                
                 // type
                 var reportingTaskType = $('<span class="referencing-component-type"></span>').text(nf.Common.substringAfterLast(referencingComponent.type, '.'));
                 
@@ -430,11 +493,17 @@ nf.ControllerService = (function () {
                 }
                 
                 // reporting task
-                var reportingTaskItem = $('<li></li>').append(reportingTaskState).append(reportingTaskLink).append(reportingTaskType).append(reportingTaskActiveThreadCount);
+                var reportingTaskItem = $('<li></li>').append(reportingTaskState).append(reportingTaskBulletins).append(reportingTaskLink).append(reportingTaskType).append(reportingTaskActiveThreadCount);
                 tasks.append(reportingTaskItem);
             }
         });
         
+        // query for the bulletins
+        queryBulletins(referencingComponentIds).done(function(response) {
+            var bulletins = response.bulletinBoard.bulletins;
+            updateReferencingComponentBulletins(bulletins);
+        });
+        
         // create the collapsable listing for each type
         var createReferenceBlock = function (titleText, list) {
             if (list.is(':empty')) {
@@ -466,6 +535,25 @@ nf.ControllerService = (function () {
     };
     
     /**
+     * Queries for bulletins for the specified components.
+     * 
+     * @param {array} componentIds
+     * @returns {deferred}
+     */
+    var queryBulletins = function(componentIds) {
+        var ids = componentIds.join('|');
+        
+        return $.ajax({
+            type: 'GET',
+            url: '../nifi-api/controller/bulletin-board',
+            data: {
+                sourceId: ids
+            },
+            dataType: 'json'
+        }).fail(nf.Common.handleAjaxError);
+    };
+    
+    /**
      * Sets whether the specified controller service is enabled.
      * 
      * @param {object} controllerService
@@ -491,13 +579,23 @@ nf.ControllerService = (function () {
         // wait unil the polling of each service finished
         return $.Deferred(function(deferred) {
             updated.done(function() {
-                var serviceUpdated = pollService(controllerService, function (service) {
+                var serviceUpdated = pollService(controllerService, function (service, bulletins) {
+                    if ($.isArray(bulletins)) {
+                        if (enabled) {
+                            updateBulletins(bulletins, $('#enable-controller-service-bulletins'));
+                        } else {
+                            updateBulletins(bulletins, $('#disable-controller-service-bulletins'));
+                        }
+                    }
+                    
                     // the condition is met once the service is ENABLED/DISABLED
                     if (enabled) {
                         return service.state === 'ENABLED';
                     } else {
                         return service.state === 'DISABLED';
                     }
+                }, function(service) {
+                    return queryBulletins([service.id]);
                 }, pollCondition);
 
                 // once the service has updated, resolve and render the updated service
@@ -608,9 +706,10 @@ nf.ControllerService = (function () {
      * 
      * @param {object} controllerService
      * @param {function} completeCondition
+     * @param {function} bulletinDeferred
      * @param {function} pollCondition
      */
-    var pollService = function (controllerService, completeCondition, pollCondition) {
+    var pollService = function (controllerService, completeCondition, bulletinDeferred, pollCondition) {
         // we want to keep polling until the condition is met
         return $.Deferred(function(deferred) {
             var current = 2;
@@ -618,19 +717,24 @@ nf.ControllerService = (function () {
                 var val = current;
                 
                 // update the current timeout for the next time
-                current = Math.max(current * 2, 8);
+                current = Math.min(current * 2, 4);
                 
                 return val * 1000;
             };
             
             // polls for the current status of the referencing components
             var poll = function() {
-                $.ajax({
+                var bulletins = bulletinDeferred(controllerService);
+                var service =  $.ajax({
                     type: 'GET',
                     url: controllerService.uri,
                     dataType: 'json'
-                }).done(function (response) {
-                    conditionMet(response.controllerService);
+                });
+                
+                $.when(bulletins, service).done(function (bulletinResult, serviceResult) {
+                    var bulletinResponse = bulletinResult[0];
+                    var serviceResponse = serviceResult[0];
+                    conditionMet(serviceResponse.controllerService, bulletinResponse.bulletinBoard.bulletins);
                 }).fail(function (xhr, status, error) {
                     deferred.reject();
                     nf.Common.handleAjaxError(xhr, status, error);
@@ -638,8 +742,8 @@ nf.ControllerService = (function () {
             };
             
             // tests to if the condition has been met
-            var conditionMet = function (service) {
-                if (completeCondition(service)) {
+            var conditionMet = function (service, bulletins) {
+                if (completeCondition(service, bulletins)) {
                     deferred.resolve();
                 } else {
                     if (typeof pollCondition === 'function' && pollCondition()) {
@@ -651,7 +755,12 @@ nf.ControllerService = (function () {
             };
             
             // poll for the status of the referencing components
-            conditionMet(controllerService);
+            bulletinDeferred(controllerService).done(function(response) {
+                conditionMet(controllerService, response.bulletinBoard.bulletins);
+            }).fail(function (xhr, status, error) {
+                deferred.reject();
+                nf.Common.handleAjaxError(xhr, status, error);
+            });
         }).promise();
     };
     
@@ -664,7 +773,7 @@ nf.ControllerService = (function () {
      */
     var stopReferencingSchedulableComponents = function (controllerService, pollCondition) {
         // continue to poll the service until all schedulable components have stopped
-        return pollService(controllerService, function (service) {
+        return pollService(controllerService, function (service, bulletins) {
             var referencingComponents = service.referencingComponents;
             
             var stillRunning = false;
@@ -682,9 +791,23 @@ nf.ControllerService = (function () {
                     updateReferencingSchedulableComponentState(referencingComponentState, referencingComponent);
                 }
             });
+            
+            // query for the bulletins
+            updateReferencingComponentBulletins(bulletins);
 
             // condition is met once all referencing are not running
             return stillRunning === false;
+        }, function(service) {
+            var referencingSchedulableComponents = [];
+            
+            var referencingComponents = service.referencingComponents;
+            $.each(referencingComponents, function(_, referencingComponent) {
+                if (referencingComponent.referenceType === 'Processor' || referencingComponent.referenceType === 'ReportingTask') {
+                    referencingSchedulableComponents.push(referencingComponent.id);
+                }
+            });
+            
+            return queryBulletins(referencingSchedulableComponents);
         }, pollCondition);
     };
     
@@ -696,7 +819,7 @@ nf.ControllerService = (function () {
      */
     var enableReferencingServices = function (controllerService, pollCondition) {
         // continue to poll the service until all referencing services are enabled
-        return pollService(controllerService, function (service) {
+        return pollService(controllerService, function (service, bulletins) {
             var referencingComponents = service.referencingComponents;
             
             var notEnabled = false;
@@ -711,9 +834,23 @@ nf.ControllerService = (function () {
                     updateReferencingServiceState(referencingServiceState, referencingComponent);
                 }
             });
+            
+            // query for the bulletins
+            updateReferencingComponentBulletins(bulletins);
 
             // condition is met once all referencing are not disabled
             return notEnabled === false;
+        }, function(service) {
+            var referencingSchedulableComponents = [];
+            
+            var referencingComponents = service.referencingComponents;
+            $.each(referencingComponents, function(_, referencingComponent) {
+                if (referencingComponent.referenceType === 'ControllerService') {
+                    referencingSchedulableComponents.push(referencingComponent.id);
+                }
+            });
+            
+            return queryBulletins(referencingSchedulableComponents);
         }, pollCondition);
     };
     
@@ -725,7 +862,7 @@ nf.ControllerService = (function () {
      */
     var disableReferencingServices = function (controllerService, pollCondition) {
         // continue to poll the service until all referencing services are disabled
-        return pollService(controllerService, function (service) {
+        return pollService(controllerService, function (service, bulletins) {
             var referencingComponents = service.referencingComponents;
             
             var notDisabled = false;
@@ -740,9 +877,23 @@ nf.ControllerService = (function () {
                     updateReferencingServiceState(referencingServiceState, referencingComponent);
                 }
             });
+            
+            // query for the bulletins
+            updateReferencingComponentBulletins(bulletins);
 
             // condition is met once all referencing are not enabled
             return notDisabled === false;
+        }, function(service) {
+            var referencingSchedulableComponents = [];
+            
+            var referencingComponents = service.referencingComponents;
+            $.each(referencingComponents, function(_, referencingComponent) {
+                if (referencingComponent.referenceType === 'ControllerService') {
+                    referencingSchedulableComponents.push(referencingComponent.id);
+                }
+            });
+            
+            return queryBulletins(referencingSchedulableComponents);
         }, pollCondition);
     };
     
@@ -823,6 +974,11 @@ nf.ControllerService = (function () {
         // show the dialog
         $('#disable-controller-service-dialog').modal('show');
         
+        // load the bulletins
+        queryBulletins([controllerService.id]).done(function(response) {
+            updateBulletins(response.bulletinBoard.bulletins, $('#disable-controller-service-bulletins'));
+        });
+        
         // update the border if necessary
         updateReferencingComponentsBorder(referencingComponentsContainer);
     };
@@ -844,6 +1000,11 @@ nf.ControllerService = (function () {
         // show the dialog
         $('#enable-controller-service-dialog').modal('show');
         
+        // load the bulletins
+        queryBulletins([controllerService.id]).done(function(response) {
+            updateBulletins(response.bulletinBoard.bulletins, $('#enable-controller-service-bulletins'));
+        });
+        
         // update the border if necessary
         updateReferencingComponentsBorder(referencingComponentsContainer);
     };
@@ -873,7 +1034,6 @@ nf.ControllerService = (function () {
         }]);
 
         // show the progress
-        $('#disable-controller-service-service-container').hide();
         $('#disable-controller-service-scope-container').hide();
         $('#disable-controller-service-progress-container').show();
 
@@ -964,7 +1124,6 @@ nf.ControllerService = (function () {
         }
 
         // show the progress
-        $('#enable-controller-service-service-container').hide();
         $('#enable-controller-service-scope-container').hide();
         $('#enable-controller-service-progress-container').show();
 
@@ -1223,6 +1382,7 @@ nf.ControllerService = (function () {
                         // empty the referencing components list
                         var referencingComponents = $('#controller-service-referencing-components');
                         nf.Common.cleanUpTooltips(referencingComponents, 'div.referencing-component-state');
+                        nf.Common.cleanUpTooltips(referencingComponents, 'div.referencing-component-bulletins');
                         referencingComponents.css('border-width', '0').empty();
                         
                         // cancel any active edits
@@ -1271,7 +1431,6 @@ nf.ControllerService = (function () {
                         var disableDialog = $(this);
                         
                         // reset visibility
-                        $('#disable-controller-service-service-container').show();
                         $('#disable-controller-service-scope-container').show();
                         $('#disable-controller-service-progress-container').hide();
                         
@@ -1279,12 +1438,17 @@ nf.ControllerService = (function () {
                         $('#disable-controller-service-id').text('');
                         $('#disable-controller-service-name').text('');
                         
+                        // bulletins
+                        $('#disable-controller-service-bulletins').removeClass('has-bulletins').removeData('bulletins').hide();
+                        nf.Common.cleanUpTooltips($('#disable-controller-service-service-container'), '#disable-controller-service-bulletins');
+                        
                         // reset progress
                         $('div.disable-referencing-components').removeClass('ajax-loading ajax-complete ajax-error');
                         
                         // referencing components
                         var referencingComponents = $('#disable-controller-service-referencing-components');
                         nf.Common.cleanUpTooltips(referencingComponents, 'div.referencing-component-state');
+                        nf.Common.cleanUpTooltips(referencingComponents, 'div.referencing-component-bulletins');
                         referencingComponents.css('border-width', '0').empty();
                         
                         // reset dialog
@@ -1339,7 +1503,6 @@ nf.ControllerService = (function () {
                         var enableDialog = $(this);
                         
                         // reset visibility
-                        $('#enable-controller-service-service-container').show();
                         $('#enable-controller-service-scope-container').show();
                         $('#enable-controller-service-progress-container').hide();
                         $('#enable-controller-service-progress li.referencing-component').show();
@@ -1348,12 +1511,17 @@ nf.ControllerService = (function () {
                         $('#enable-controller-service-id').text('');
                         $('#enable-controller-service-name').text('');
                         
+                        // bulletins
+                        $('#enable-controller-service-bulletins').removeClass('has-bulletins').removeData('bulletins').hide();
+                        nf.Common.cleanUpTooltips($('#enable-controller-service-service-container'), '#enable-controller-service-bulletins');
+                        
                         // reset progress
                         $('div.enable-referencing-components').removeClass('ajax-loading ajax-complete ajax-error');
                         
                         // referencing components
                         var referencingComponents = $('#enable-controller-service-referencing-components');
                         nf.Common.cleanUpTooltips(referencingComponents, 'div.referencing-component-state');
+                        nf.Common.cleanUpTooltips(referencingComponents, 'div.referencing-component-bulletins');
                         referencingComponents.css('border-width', '0').empty();
                         
                         // reset dialog
@@ -1654,13 +1822,7 @@ nf.ControllerService = (function () {
          * @param {object} controllerService
          */
         enable: function(controllerService) {
-            if (nf.Common.isEmpty(controllerService.referencingComponents)) {
-                setEnabled(controllerService, true).always(function () {
-                    reloadControllerServiceAndReferencingComponents(controllerService);
-                });
-            } else {
-                showEnableControllerServiceDialog(controllerService);
-            }
+            showEnableControllerServiceDialog(controllerService);
         },
         
         /**
@@ -1669,13 +1831,7 @@ nf.ControllerService = (function () {
          * @param {object} controllerService
          */
         disable: function(controllerService) {
-            if (nf.Common.isEmpty(controllerService.referencingComponents)) {
-                setEnabled(controllerService, false).always(function () {
-                    reloadControllerServiceAndReferencingComponents(controllerService);
-                });
-            } else {
-                showDisableControllerServiceDialog(controllerService);
-            }
+            showDisableControllerServiceDialog(controllerService);
         },
         
         /**


[02/10] incubator-nifi git commit: NIFI-724: Enable bulletins for reporting tasks and controller services

Posted by mc...@apache.org.
NIFI-724: Enable bulletins for reporting tasks and controller services


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e240e07a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e240e07a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e240e07a

Branch: refs/heads/develop
Commit: e240e07aaebea1fd66b22fce8aec3f0005fd3f60
Parents: e767f5c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jun 24 14:03:34 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jun 25 10:56:49 2015 -0400

----------------------------------------------------------------------
 .../org/apache/nifi/reporting/Bulletin.java     | 11 ++-
 .../apache/nifi/reporting/ComponentType.java    | 58 ++++++++++++++
 .../manager/impl/ClusteredReportingContext.java | 46 ++++++++++-
 .../cluster/manager/impl/WebClusterManager.java | 80 ++++++++++++--------
 .../org/apache/nifi/events/BulletinFactory.java | 30 ++++++--
 .../org/apache/nifi/events/SystemBulletin.java  |  2 +
 .../nifi/logging/LogRepositoryFactory.java      |  6 +-
 .../apache/nifi/controller/FlowController.java  | 35 ++++++---
 .../service/ControllerServiceLoader.java        | 11 +--
 .../nifi/events/VolatileBulletinRepository.java | 18 +++--
 .../org/apache/nifi/jaxb/AdaptedBulletin.java   | 10 +++
 .../org/apache/nifi/jaxb/BulletinAdapter.java   |  3 +-
 .../logging/ControllerServiceLogObserver.java   | 45 +++++++++++
 .../nifi/logging/ReportingTaskLogObserver.java  | 45 +++++++++++
 .../nifi/remote/StandardRemoteProcessGroup.java | 16 ++--
 .../nifi/remote/StandardRootGroupPort.java      |  4 +-
 16 files changed, 349 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
index 87443a3..fe370ae 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
@@ -34,6 +34,7 @@ public abstract class Bulletin implements Comparable<Bulletin> {
     private String groupId;
     private String sourceId;
     private String sourceName;
+    private ComponentType sourceType;
 
     protected Bulletin(final long id) {
         this.timestamp = new Date();
@@ -104,9 +105,17 @@ public abstract class Bulletin implements Comparable<Bulletin> {
         this.sourceName = sourceName;
     }
 
+    public ComponentType getSourceType() {
+        return sourceType;
+    }
+
+    public void setSourceType(ComponentType sourceType) {
+        this.sourceType = sourceType;
+    }
+
     @Override
     public String toString() {
-        return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + '}';
+        return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + ", sourceType=" + sourceType + '}';
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
new file mode 100644
index 0000000..97f3538
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting;
+
+/**
+ * An Enumeration for indicating which type of component a Bulletin is associated with
+ */
+public enum ComponentType {
+
+    /**
+     * Bulletin is associated with a Processor
+     */
+    PROCESSOR,
+
+    /**
+     * Bulletin is associated with a Remote Process Group
+     */
+    REMOTE_PROCESS_GROUP,
+
+    /**
+     * Bulletin is associated with an Input Port
+     */
+    INPUT_PORT,
+
+    /**
+     * Bulletin is associated with an Output Port
+     */
+    OUTPUT_PORT,
+
+    /**
+     * Bulletin is associated with a Reporting Task
+     */
+    REPORTING_TASK,
+
+    /**
+     * Bulletin is associated with a Controller Service
+     */
+    CONTROLLER_SERVICE,
+
+    /**
+     * Bulletin is a system-level bulletin, associated with the Flow Controller
+     */
+    FLOW_CONTROLLER;
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
index e546f87..c6624cc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
@@ -24,15 +24,18 @@ import org.apache.nifi.attribute.expression.language.PreparedQuery;
 import org.apache.nifi.attribute.expression.language.Query;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ControllerService;
 import org.apache.nifi.controller.ControllerServiceLookup;
 import org.apache.nifi.controller.service.ControllerServiceProvider;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.processor.StandardPropertyValue;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.EventAccess;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.Severity;
@@ -85,8 +88,9 @@ public class ClusteredReportingContext implements ReportingContext {
         final ProcessGroupStatus rootGroupStatus = eventAccess.getControllerStatus();
         final String groupId = findGroupId(rootGroupStatus, componentId);
         final String componentName = findComponentName(rootGroupStatus, componentId);
+        final ComponentType componentType = findComponentType(rootGroupStatus, componentId);
 
-        return BulletinFactory.createBulletin(groupId, componentId, componentName, category, severity.name(), message);
+        return BulletinFactory.createBulletin(groupId, componentId, componentType, componentName, category, severity.name(), message);
     }
 
     @Override
@@ -134,6 +138,46 @@ public class ClusteredReportingContext implements ReportingContext {
         return null;
     }
 
+    private ComponentType findComponentType(final ProcessGroupStatus groupStatus, final String componentId) {
+        for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) {
+            if (procStatus.getId().equals(componentId)) {
+                return ComponentType.PROCESSOR;
+            }
+        }
+
+        for (final PortStatus portStatus : groupStatus.getInputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return ComponentType.INPUT_PORT;
+            }
+        }
+
+        for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) {
+            if (portStatus.getId().equals(componentId)) {
+                return ComponentType.OUTPUT_PORT;
+            }
+        }
+
+        for (final RemoteProcessGroupStatus remoteStatus : groupStatus.getRemoteProcessGroupStatus()) {
+            if (remoteStatus.getId().equals(componentId)) {
+                return ComponentType.REMOTE_PROCESS_GROUP;
+            }
+        }
+
+        for (final ProcessGroupStatus childGroup : groupStatus.getProcessGroupStatus()) {
+            final ComponentType type = findComponentType(childGroup, componentId);
+            if (type != null) {
+                return type;
+            }
+        }
+
+        final ControllerService service = serviceProvider.getControllerService(componentId);
+        if (service != null) {
+            return ComponentType.CONTROLLER_SERVICE;
+        }
+
+        return null;
+    }
+
     private String findComponentName(final ProcessGroupStatus groupStatus, final String componentId) {
         for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) {
             if (procStatus.getId().equals(componentId)) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index d6ba6db..9edc83f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -129,6 +129,7 @@ import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ReportingTaskNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.StandardFlowSerializer;
+import org.apache.nifi.controller.StandardProcessorNode;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
 import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
@@ -159,7 +160,12 @@ import org.apache.nifi.events.VolatileBulletinRepository;
 import org.apache.nifi.framework.security.util.SslContextFactory;
 import org.apache.nifi.io.socket.multicast.DiscoverableService;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.ControllerServiceLogObserver;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.logging.ReportingTaskLogObserver;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
@@ -929,7 +935,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             //optional properties for all ReportingTasks
             for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "reportingTask")) {
                 //add global properties common to all tasks
-                Map<String, String> properties = new HashMap<>();
+                final Map<String, String> properties = new HashMap<>();
 
                 //get properties for the specific reporting task - id, name, class,
                 //and schedulingPeriod must be set
@@ -1080,6 +1086,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             }
         }
 
+        // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+            new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
+
         return taskNode;
     }
 
@@ -1368,7 +1379,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
     @Override
     public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
-        return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+        final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+
+        // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+            new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
+
+        return serviceNode;
     }
 
     @Override
@@ -1630,7 +1648,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort();
 
         // unmarshal the message
-        BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload());
+        final BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload());
         for (final Bulletin bulletin : payload.getBulletins()) {
             bulletin.setNodeAddress(nodeAddress);
             bulletinRepository.addBulletin(bulletin);
@@ -1688,7 +1706,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
             final int numPendingHeartbeats = mostRecentHeartbeats.size();
             if (heartbeatLogger.isDebugEnabled()) {
-                heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, (numPendingHeartbeats > 1) ? "s" : ""));
+                heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, numPendingHeartbeats > 1 ? "s" : ""));
             }
 
             for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) {
@@ -2130,7 +2148,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         readLock.lock();
         try {
             final ClusterServicesBroadcaster broadcaster = this.servicesBroadcaster;
-            return (broadcaster != null && broadcaster.isRunning());
+            return broadcaster != null && broadcaster.isRunning();
         } finally {
             readLock.unlock("isBroadcasting");
         }
@@ -2323,7 +2341,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                                 if (auditService != null) {
                                     try {
                                         auditService.addActions(clusterContext.getActions());
-                                    } catch (Throwable t) {
+                                    } catch (final Throwable t) {
                                         logger.warn("Unable to record actions: " + t.getMessage());
                                         if (logger.isDebugEnabled()) {
                                             logger.warn(StringUtils.EMPTY, t);
@@ -2834,7 +2852,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ProcessorEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorEntity.class);
+                final ProcessorEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorEntity.class);
                 final ProcessorDTO nodeProcessor = nodeResponseEntity.getProcessor();
                 processorMap.put(nodeResponse.getNodeId(), nodeProcessor);
             }
@@ -2851,7 +2869,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ProcessorsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
+                final ProcessorsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
                 final Set<ProcessorDTO> nodeProcessors = nodeResponseEntity.getProcessors();
 
                 for (final ProcessorDTO nodeProcessor : nodeProcessors) {
@@ -2892,7 +2910,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                         continue;
                     }
 
-                    final ProcessGroupEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
+                    final ProcessGroupEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
                     final ProcessGroupDTO nodeProcessGroup = nodeResponseEntity.getProcessGroup();
 
                     for (final ProcessorDTO nodeProcessor : nodeProcessGroup.getContents().getProcessors()) {
@@ -2952,7 +2970,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                         continue;
                     }
 
-                    final FlowSnippetEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
+                    final FlowSnippetEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
                     final FlowSnippetDTO nodeContents = nodeResponseEntity.getContents();
 
                     for (final ProcessorDTO nodeProcessor : nodeContents.getProcessors()) {
@@ -2995,7 +3013,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
             // create a new client response
             clientResponse = new NodeResponse(clientResponse, responseEntity);
-        } else if (hasSuccessfulClientResponse && (isRemoteProcessGroupEndpoint(uri, method))) {
+        } else if (hasSuccessfulClientResponse && isRemoteProcessGroupEndpoint(uri, method)) {
             final RemoteProcessGroupEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
             final RemoteProcessGroupDTO remoteProcessGroup = responseEntity.getRemoteProcessGroup();
 
@@ -3005,7 +3023,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final RemoteProcessGroupEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
+                final RemoteProcessGroupEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
                 final RemoteProcessGroupDTO nodeRemoteProcessGroup = nodeResponseEntity.getRemoteProcessGroup();
 
                 remoteProcessGroupMap.put(nodeResponse.getNodeId(), nodeRemoteProcessGroup);
@@ -3013,7 +3031,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             mergeRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupMap);
 
             clientResponse = new NodeResponse(clientResponse, responseEntity);
-        } else if (hasSuccessfulClientResponse && (isRemoteProcessGroupsEndpoint(uri, method))) {
+        } else if (hasSuccessfulClientResponse && isRemoteProcessGroupsEndpoint(uri, method)) {
             final RemoteProcessGroupsEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
             final Set<RemoteProcessGroupDTO> remoteProcessGroups = responseEntity.getRemoteProcessGroups();
 
@@ -3023,7 +3041,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final RemoteProcessGroupsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
+                final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
                 final Set<RemoteProcessGroupDTO> nodeRemoteProcessGroups = nodeResponseEntity.getRemoteProcessGroups();
 
                 for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeRemoteProcessGroups) {
@@ -3056,7 +3074,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ProvenanceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class);
+                final ProvenanceEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class);
                 final ProvenanceDTO nodeQuery = nodeResponseEntity.getProvenance();
 
                 resultsMap.put(nodeResponse.getNodeId(), nodeQuery);
@@ -3084,7 +3102,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ControllerServiceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+                final ControllerServiceEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
                 final ControllerServiceDTO nodeControllerService = nodeResponseEntity.getControllerService();
 
                 resultsMap.put(nodeResponse.getNodeId(), nodeControllerService);
@@ -3102,7 +3120,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ControllerServicesEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+                final ControllerServicesEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
                 final Set<ControllerServiceDTO> nodeControllerServices = nodeResponseEntity.getControllerServices();
 
                 for (final ControllerServiceDTO nodeControllerService : nodeControllerServices) {
@@ -3136,7 +3154,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                 }
 
                 final ControllerServiceReferencingComponentsEntity nodeResponseEntity =
-                        (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+                        nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
                 final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents();
 
                 resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
@@ -3154,7 +3172,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ReportingTaskEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+                final ReportingTaskEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
                 final ReportingTaskDTO nodeReportingTask = nodeResponseEntity.getReportingTask();
 
                 resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask);
@@ -3172,7 +3190,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
                     continue;
                 }
 
-                final ReportingTasksEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+                final ReportingTasksEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
                 final Set<ReportingTaskDTO> nodeReportingTasks = nodeResponseEntity.getReportingTasks();
 
                 for (final ReportingTaskDTO nodeReportingTask : nodeReportingTasks) {
@@ -3428,7 +3446,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
     }
 
     private boolean canChangeNodeState(final String method, final URI uri) {
-        return (HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method));
+        return HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method);
     }
 
     private void notifyDataFlowManagementServiceOfNodeStatusChange() {
@@ -3477,7 +3495,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
             public void run() {
                 logger.info("Entering safe mode...");
                 final int safeModeSeconds = (int) FormatUtils.getTimeDuration(properties.getClusterManagerSafeModeDuration(), TimeUnit.SECONDS);
-                final long timeToElect = (safeModeSeconds <= 0) ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
+                final long timeToElect = safeModeSeconds <= 0 ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
                 boolean exitSafeMode = false;
                 while (isRunning()) {
 
@@ -3819,7 +3837,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
 
     public static Date normalizeStatusSnapshotDate(final Date toNormalize, final long numMillis) {
         final long time = toNormalize.getTime();
-        return new Date(time - (time % numMillis));
+        return new Date(time - time % numMillis);
     }
 
     private NodeDTO createNodeDTO(final Node node) {
@@ -3861,8 +3879,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
         StatusHistoryDTO lastStatusHistory = null;
-        Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>();
-        Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+        final Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>();
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
 
         for (final Node node : getRawNodes()) {
             final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
@@ -3942,8 +3960,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
         StatusHistoryDTO lastStatusHistory = null;
-        Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>();
-        Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+        final Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>();
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
 
         for (final Node node : getRawNodes()) {
             final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
@@ -4006,8 +4024,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
         StatusHistoryDTO lastStatusHistory = null;
-        Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>();
-        Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+        final Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>();
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
 
         for (final Node node : getRawNodes()) {
             final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
@@ -4070,8 +4088,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
         final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
 
         StatusHistoryDTO lastStatusHistory = null;
-        Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>();
-        Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+        final Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>();
+        final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
 
         for (final Node node : getRawNodes()) {
             final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
index d1d5e5b..4795827 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
@@ -17,24 +17,43 @@
 package org.apache.nifi.events;
 
 import java.util.concurrent.atomic.AtomicLong;
+
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.ComponentType;
 
-/**
- *
- */
 public final class BulletinFactory {
 
     private static final AtomicLong currentId = new AtomicLong(0);
 
     public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message) {
-        return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), connectable.getName(), category, severity, message);
+        final ComponentType type;
+        switch (connectable.getConnectableType()) {
+            case REMOTE_INPUT_PORT:
+            case REMOTE_OUTPUT_PORT:
+                type = ComponentType.REMOTE_PROCESS_GROUP;
+                break;
+            case INPUT_PORT:
+                type = ComponentType.INPUT_PORT;
+                break;
+            case OUTPUT_PORT:
+                type = ComponentType.OUTPUT_PORT;
+                break;
+            case PROCESSOR:
+            default:
+                type = ComponentType.PROCESSOR;
+                break;
+        }
+
+        return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), type, connectable.getName(), category, severity, message);
     }
 
-    public static Bulletin createBulletin(final String groupId, final String sourceId, final String sourceName, final String category, final String severity, final String message) {
+    public static Bulletin createBulletin(final String groupId, final String sourceId, final ComponentType sourceType, final String sourceName,
+        final String category, final String severity, final String message) {
         final Bulletin bulletin = new ComponentBulletin(currentId.getAndIncrement());
         bulletin.setGroupId(groupId);
         bulletin.setSourceId(sourceId);
+        bulletin.setSourceType(sourceType);
         bulletin.setSourceName(sourceName);
         bulletin.setCategory(category);
         bulletin.setLevel(severity);
@@ -47,6 +66,7 @@ public final class BulletinFactory {
         bulletin.setCategory(category);
         bulletin.setLevel(severity);
         bulletin.setMessage(message);
+        bulletin.setSourceType(ComponentType.FLOW_CONTROLLER);
         return bulletin;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
index f97dc46..3359e7e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
@@ -17,6 +17,7 @@
 package org.apache.nifi.events;
 
 import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.ComponentType;
 
 /**
  *
@@ -25,6 +26,7 @@ public class SystemBulletin extends Bulletin {
 
     SystemBulletin(final long id) {
         super(id);
+        setSourceType(ComponentType.FLOW_CONTROLLER);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
index 76ca661..d7fa3fc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
@@ -41,8 +41,8 @@ public class LogRepositoryFactory {
         logRepositoryClass = clazz;
     }
 
-    public static LogRepository getRepository(final String processorId) {
-        LogRepository repository = repositoryMap.get(requireNonNull(processorId));
+    public static LogRepository getRepository(final String componentId) {
+        LogRepository repository = repositoryMap.get(requireNonNull(componentId));
         if (repository == null) {
             try {
                 repository = logRepositoryClass.newInstance();
@@ -50,7 +50,7 @@ public class LogRepositoryFactory {
                 throw new RuntimeException(e);
             }
 
-            final LogRepository oldRepository = repositoryMap.putIfAbsent(processorId, repository);
+            final LogRepository oldRepository = repositoryMap.putIfAbsent(componentId, repository);
             if (oldRepository != null) {
                 repository = oldRepository;
             }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2ffdd4e..b6edbbb 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -137,11 +137,13 @@ import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.groups.StandardProcessGroup;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.ControllerServiceLogObserver;
 import org.apache.nifi.logging.LogLevel;
 import org.apache.nifi.logging.LogRepository;
 import org.apache.nifi.logging.LogRepositoryFactory;
 import org.apache.nifi.logging.ProcessorLog;
 import org.apache.nifi.logging.ProcessorLogObserver;
+import org.apache.nifi.logging.ReportingTaskLogObserver;
 import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.nar.NarThreadContextClassLoader;
@@ -593,7 +595,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         writeLock.lock();
         try {
             if (startDelayedComponents) {
-                LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()));
+                LOG.info("Starting {} processors/ports/funnels", startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size());
                 for (final Connectable connectable : startConnectablesAfterInitialization) {
                     if (connectable.getScheduledState() == ScheduledState.DISABLED) {
                         continue;
@@ -1012,7 +1014,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public boolean isTerminated() {
         this.readLock.lock();
         try {
-            return (null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated());
+            return null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated();
         } finally {
             this.readLock.unlock();
         }
@@ -1828,9 +1830,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         } else if (id1.equals(id2)) {
             return true;
         } else {
-            final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1);
-            final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2);
-            return (comparable1.equals(comparable2));
+            final String comparable1 = id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1;
+            final String comparable2 = id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2;
+            return comparable1.equals(comparable2);
         }
     }
 
@@ -1964,7 +1966,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id;
-        return (root == null) ? null : root.findProcessGroup(searchId);
+        return root == null ? null : root.findProcessGroup(searchId);
     }
 
     @Override
@@ -2079,8 +2081,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                 connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut());
                 connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut());
 
-                flowFilesTransferred += (connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut());
-                bytesTransferred += (connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut());
+                flowFilesTransferred += connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut();
+                bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut();
             }
 
             if (StringUtils.isNotBlank(conn.getName())) {
@@ -2552,6 +2554,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
 
         reportingTasks.put(id, taskNode);
+
+        // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+            new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
+
         return taskNode;
     }
 
@@ -2616,7 +2624,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
     @Override
     public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
-        return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+        final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+
+        // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+        final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+        logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+            new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
+
+        return serviceNode;
     }
 
     @Override
@@ -3480,7 +3495,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     if (bulletin.getGroupId() == null) {
                         escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
                     } else {
-                        escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(),
+                        escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(),
                                 bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
                     }
                 } else {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 92fa3b2..b5c3855 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -48,11 +48,12 @@ public class ControllerServiceLoader {
     private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class);
 
     public static List<ControllerServiceNode> loadControllerServices(
-            final ControllerServiceProvider provider,
-            final InputStream serializedStream,
-            final StringEncryptor encryptor,
-            final BulletinRepository bulletinRepo,
-            final boolean autoResumeState) throws IOException {
+        final ControllerServiceProvider provider,
+        final InputStream serializedStream,
+        final StringEncryptor encryptor,
+        final BulletinRepository bulletinRepo,
+        final boolean autoResumeState) throws IOException {
+
         final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
         documentBuilderFactory.setNamespaceAware(true);
 

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index a20e974..5172d34 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.nifi.reporting.Bulletin;
 import org.apache.nifi.reporting.BulletinQuery;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.util.RingBuffer;
 import org.apache.nifi.util.RingBuffer.Filter;
 
@@ -167,7 +168,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
         }
 
         final RingBuffer<Bulletin> buffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
-        return (buffer == null) ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() {
+        return buffer == null ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() {
             @Override
             public boolean select(final Bulletin bulletin) {
                 return bulletin.getTimestamp().getTime() >= fiveMinutesAgo;
@@ -194,12 +195,12 @@ public class VolatileBulletinRepository implements BulletinRepository {
     }
 
     private RingBuffer<Bulletin> getBulletinBuffer(final Bulletin bulletin) {
-        final String groupId = getBulletinStoreKey(bulletin);
+        final String storageKey = getBulletinStoreKey(bulletin);
 
-        ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(groupId);
+        ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(storageKey);
         if (componentMap == null) {
             componentMap = new ConcurrentHashMap<>();
-            ConcurrentMap<String, RingBuffer<Bulletin>> existing = bulletinStoreMap.putIfAbsent(groupId, componentMap);
+            final ConcurrentMap<String, RingBuffer<Bulletin>> existing = bulletinStoreMap.putIfAbsent(storageKey, componentMap);
             if (existing != null) {
                 componentMap = existing;
             }
@@ -221,11 +222,16 @@ public class VolatileBulletinRepository implements BulletinRepository {
     }
 
     private String getBulletinStoreKey(final Bulletin bulletin) {
-        return isControllerBulletin(bulletin) ? CONTROLLER_BULLETIN_STORE_KEY : bulletin.getGroupId();
+        if (isControllerBulletin(bulletin)) {
+            return CONTROLLER_BULLETIN_STORE_KEY;
+        }
+
+        final String groupId = bulletin.getGroupId();
+        return groupId == null ? bulletin.getSourceId() : groupId;
     }
 
     private boolean isControllerBulletin(final Bulletin bulletin) {
-        return bulletin.getGroupId() == null;
+        return ComponentType.FLOW_CONTROLLER.equals(bulletin.getSourceType());
     }
 
     private class DefaultBulletinProcessingStrategy implements BulletinProcessingStrategy {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
index 6f1dc45..17c5991 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
@@ -18,6 +18,8 @@ package org.apache.nifi.jaxb;
 
 import java.util.Date;
 
+import org.apache.nifi.reporting.ComponentType;
+
 /**
  *
  */
@@ -32,6 +34,7 @@ public class AdaptedBulletin {
     private String groupId;
     private String sourceId;
     private String sourceName;
+    private ComponentType sourceType;
 
     public String getCategory() {
         return category;
@@ -97,4 +100,11 @@ public class AdaptedBulletin {
         this.timestamp = timestamp;
     }
 
+    public ComponentType getSourceType() {
+        return sourceType;
+    }
+
+    public void setSourceType(ComponentType sourceType) {
+        this.sourceType = sourceType;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
index b699348..acbe0dd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
@@ -34,7 +34,7 @@ public class BulletinAdapter extends XmlAdapter<AdaptedBulletin, Bulletin> {
         if (b.getSourceId() == null) {
             return BulletinFactory.createBulletin(b.getCategory(), b.getLevel(), b.getMessage());
         } else {
-            return BulletinFactory.createBulletin(b.getGroupId(), b.getSourceId(), b.getSourceName(), b.getCategory(), b.getLevel(), b.getMessage());
+            return BulletinFactory.createBulletin(b.getGroupId(), b.getSourceId(), b.getSourceType(), b.getSourceName(), b.getCategory(), b.getLevel(), b.getMessage());
         }
     }
 
@@ -48,6 +48,7 @@ public class BulletinAdapter extends XmlAdapter<AdaptedBulletin, Bulletin> {
         aBulletin.setTimestamp(b.getTimestamp());
         aBulletin.setGroupId(b.getGroupId());
         aBulletin.setSourceId(b.getSourceId());
+        aBulletin.setSourceType(b.getSourceType());
         aBulletin.setSourceName(b.getSourceName());
         aBulletin.setCategory(b.getCategory());
         aBulletin.setLevel(b.getLevel());

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
new file mode 100644
index 0000000..837e1c4
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.reporting.Severity;
+
+public class ControllerServiceLogObserver implements LogObserver {
+    private final BulletinRepository bulletinRepository;
+    private final ControllerServiceNode serviceNode;
+
+    public ControllerServiceLogObserver(final BulletinRepository bulletinRepository, final ControllerServiceNode serviceNode) {
+        this.bulletinRepository = bulletinRepository;
+        this.serviceNode = serviceNode;
+    }
+
+    @Override
+    public void onLogMessage(final LogMessage message) {
+        // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
+        // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
+        final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString();
+
+        final Bulletin bulletin = BulletinFactory.createBulletin(null, serviceNode.getIdentifier(), ComponentType.REPORTING_TASK,
+            serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage());
+        bulletinRepository.addBulletin(bulletin);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
new file mode 100644
index 0000000..e5638d6
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.reporting.Severity;
+
+public class ReportingTaskLogObserver implements LogObserver {
+    private final BulletinRepository bulletinRepository;
+    private final ReportingTaskNode taskNode;
+
+    public ReportingTaskLogObserver(final BulletinRepository bulletinRepository, final ReportingTaskNode taskNode) {
+        this.bulletinRepository = bulletinRepository;
+        this.taskNode = taskNode;
+    }
+
+    @Override
+    public void onLogMessage(final LogMessage message) {
+        // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
+        // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
+        final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString();
+
+        final Bulletin bulletin = BulletinFactory.createBulletin(null, taskNode.getIdentifier(), ComponentType.REPORTING_TASK,
+            taskNode.getName(), "Log Message", bulletinLevel, message.getMessage());
+        bulletinRepository.addBulletin(bulletin);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index d19b5c1..61516d0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -57,6 +57,7 @@ import org.apache.nifi.groups.ProcessGroupCounts;
 import org.apache.nifi.groups.RemoteProcessGroup;
 import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.util.FormatUtils;
 import org.apache.nifi.util.NiFiProperties;
@@ -164,7 +165,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
                 final String groupId = StandardRemoteProcessGroup.this.getProcessGroup().getIdentifier();
                 final String sourceId = StandardRemoteProcessGroup.this.getIdentifier();
                 final String sourceName = StandardRemoteProcessGroup.this.getName();
-                bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message));
+                bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, ComponentType.REMOTE_PROCESS_GROUP,
+                    sourceName, category, severity.name(), message));
             }
         };
 
@@ -227,7 +229,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     @Override
     public String getName() {
         final String name = this.name.get();
-        return (name == null) ? targetUri.toString() : name;
+        return name == null ? targetUri.toString() : name;
     }
 
     @Override
@@ -671,7 +673,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
 
     private ProcessGroup getRootGroup(final ProcessGroup context) {
         final ProcessGroup parent = context.getParent();
-        return (parent == null) ? context : getRootGroup(parent);
+        return parent == null ? context : getRootGroup(parent);
     }
 
     private boolean isWebApiSecure() {
@@ -714,7 +716,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
     public Date getLastRefreshTime() {
         readLock.lock();
         try {
-            return (refreshContentsTimestamp == null) ? null : new Date(refreshContentsTimestamp);
+            return refreshContentsTimestamp == null ? null : new Date(refreshContentsTimestamp);
         } finally {
             readLock.unlock();
         }
@@ -855,7 +857,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
         Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
         if (ports != null) {
             remotePorts = new LinkedHashSet<>(ports.size());
-            for (PortDTO port : ports) {
+            for (final PortDTO port : ports) {
                 final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
                 final ScheduledState scheduledState = ScheduledState.valueOf(port.getState());
                 descriptor.setId(port.getId());
@@ -1093,7 +1095,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
                     }
 
                     final String remoteInstanceId = dto.getInstanceId();
-                    boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId);
+                    final boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId);
                     pointsToCluster.set(isPointingToCluster);
                 } else if (statusCode == UNAUTHORIZED_STATUS_CODE) {
                     try {
@@ -1120,7 +1122,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
                             new Object[]{this, response.getStatus(), response.getStatusInfo().getReasonPhrase(), message});
                     authorizationIssue = "Unable to determine Site-to-Site availability.";
                 }
-            } catch (Exception e) {
+            } catch (final Exception e) {
                 logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e));
                 getEventReporter().reportEvent(Severity.WARNING, "Site to Site", String.format("Unable to connect to %s due to %s",
                         StandardRemoteProcessGroup.this.getTargetUri().toString(), e));

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 4bb1683..9eadec0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -61,6 +61,7 @@ import org.apache.nifi.remote.exception.TransmissionDisabledException;
 import org.apache.nifi.remote.protocol.CommunicationsSession;
 import org.apache.nifi.remote.protocol.ServerProtocol;
 import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
 import org.apache.nifi.reporting.Severity;
 import org.apache.nifi.scheduling.SchedulingStrategy;
 import org.apache.nifi.user.NiFiUser;
@@ -108,7 +109,8 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
                 final String groupId = StandardRootGroupPort.this.getProcessGroup().getIdentifier();
                 final String sourceId = StandardRootGroupPort.this.getIdentifier();
                 final String sourceName = StandardRootGroupPort.this.getName();
-                bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message));
+                final ComponentType componentType = direction == TransferDirection.RECEIVE ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT;
+                bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, componentType, sourceName, category, severity.name(), message));
             }
         };
 


[07/10] incubator-nifi git commit: NIFI-694: - Fixing bug when clearing bulletins from the reporting task table. - Allowed ENABLING state to trigger transition to next step of the enable/disable request.

Posted by mc...@apache.org.
NIFI-694:
- Fixing bug when clearing bulletins from the reporting task table.
- Allowed ENABLING state to trigger transition to next step of the enable/disable request.

Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4fd1e949
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4fd1e949
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4fd1e949

Branch: refs/heads/develop
Commit: 4fd1e9494e41bea198dabedacee16ed018a3a7b7
Parents: e7c0461
Author: Matt Gilman <ma...@gmail.com>
Authored: Wed Jul 1 14:25:12 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Wed Jul 1 14:25:12 2015 -0400

----------------------------------------------------------------------
 .../src/main/webapp/js/nf/canvas/nf-controller-service.js      | 6 +++---
 .../nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js    | 2 +-
 2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4fd1e949/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
index efcf9fb..078064a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
@@ -591,9 +591,9 @@ nf.ControllerService = (function () {
                         }
                     }
                     
-                    // the condition is met once the service is ENABLED/DISABLED
+                    // the condition is met once the service is (ENABLING or ENABLED)/DISABLED
                     if (enabled) {
-                        return service.state === 'ENABLED';
+                        return service.state === 'ENABLING' || service.state === 'ENABLED';
                     } else {
                         return service.state === 'DISABLED';
                     }
@@ -828,7 +828,7 @@ nf.ControllerService = (function () {
             var notEnabled = false;
             $.each(referencingComponents, function(_, referencingComponent) {
                 if (referencingComponent.referenceType === 'ControllerService') {
-                    if (referencingComponent.state !== 'ENABLED') {
+                    if (referencingComponent.state !== 'ENABLING' && referencingComponent.state !== 'ENABLED') {
                         notEnabled = true;
                     } 
                         

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4fd1e949/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
index 3715110..476f34b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
@@ -1741,7 +1741,7 @@ nf.Settings = (function () {
                 // if there are no bulletins clear all
                 var reportingTasks = reportingTasksData.getItems();
                 $.each(reportingTasks, function(_, reportingTask) {
-                    controllerServicesData.updateItem(reportingTask.id, $.extend(reportingTask, {
+                    reportingTasksData.updateItem(reportingTask.id, $.extend(reportingTask, {
                         bulletins: []
                     }));
                 });


[10/10] incubator-nifi git commit: Merge branch 'develop' into NIFI-724

Posted by mc...@apache.org.
Merge branch 'develop' into NIFI-724


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/eddc071b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/eddc071b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/eddc071b

Branch: refs/heads/develop
Commit: eddc071b8efc3c6ca86e373b56f214514b2abd0a
Parents: 566c4fe a091807
Author: Matt Gilman <ma...@gmail.com>
Authored: Thu Jul 2 16:27:05 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Jul 2 16:27:05 2015 -0400

----------------------------------------------------------------------
 .../nifi/controller/ConfigurationContext.java   |  23 ++-
 .../src/main/asciidoc/developer-guide.adoc      |   2 +-
 .../nifi-nifi-example-nar/pom.xml               |  35 ++++
 .../nifi-nifi-example-processors/pom.xml        |  70 +++++++
 .../nifi/processors/WriteResourceToStream.java  | 102 ++++++++++
 .../org.apache.nifi.processor.Processor         |  15 ++
 .../src/main/resources/file.txt                 |   1 +
 .../processors/WriteResourceToStreamTest.java   |  47 +++++
 nifi/nifi-external/nifi-example-bundle/pom.xml  |  42 +++++
 nifi/nifi-external/pom.xml                      |   2 +
 .../nifi/util/MockConfigurationContext.java     |  29 ++-
 .../nifi/util/StandardProcessorTestRunner.java  |   4 +-
 .../nifi-framework/nifi-documentation/pom.xml   |   5 +
 .../apache/nifi/documentation/DocGenerator.java |   5 +-
 .../html/HtmlDocumentationWriter.java           |  75 ++++++--
 .../init/ControllerServiceInitializer.java      |  20 +-
 .../init/ProcessorInitializer.java              |  17 +-
 .../init/ReportingTaskingInitializer.java       |  14 +-
 .../mock/MockConfigurationContext.java          |  48 +++++
 .../documentation/mock/MockProcessContext.java  |  85 +++++++++
 .../documentation/util/ReflectionUtils.java     | 139 ++++++++++++++
 .../nifi/documentation/DocGeneratorTest.java    |  96 ++++++++++
 .../FullyDocumentedControllerService.java       |  63 +++++--
 .../example/FullyDocumentedProcessor.java       |  49 ++++-
 .../example/FullyDocumentedReportingTask.java   |  46 ++++-
 .../html/HtmlDocumentationWriterTest.java       |  33 +++-
 .../html/ProcessorDocumentationWriterTest.java  |  19 +-
 .../src/test/resources/conf/nifi.properties     | 129 +++++++++++++
 .../src/test/resources/lib/example.nar          | Bin 0 -> 721040 bytes
 .../src/test/resources/lib/jetty.nar            | Bin 0 -> 4638519 bytes
 .../test/resources/lib/nifi-framework-nar.nar   | Bin 0 -> 406 bytes
 .../java/org/apache/nifi/nar/NarCloseable.java  |  44 -----
 .../nifi/nar/NarThreadContextClassLoader.java   | 187 -------------------
 .../apache/nifi/controller/FlowController.java  |   2 +-
 .../reporting/AbstractReportingTaskNode.java    |   4 +-
 .../repository/FileSystemRepository.java        |  20 +-
 .../repository/VolatileContentRepository.java   |   8 +-
 .../scheduling/StandardProcessScheduler.java    |  24 +--
 .../service/StandardConfigurationContext.java   |  26 ++-
 .../service/StandardControllerServiceNode.java  |   2 +-
 .../StandardControllerServiceProvider.java      |  10 +-
 ...nifi.controller.repository.ContentRepository |  16 ++
 ...ifi.controller.repository.FlowFileRepository |  16 ++
 ...fi.controller.repository.FlowFileSwapManager |  15 ++
 ...ler.status.history.ComponentStatusRepository |  15 ++
 .../java/org/apache/nifi/nar/NarCloseable.java  |  44 +++++
 .../nifi/nar/NarThreadContextClassLoader.java   | 187 +++++++++++++++++++
 .../nifi-web-ui/src/main/webapp/css/main.css    |   1 +
 .../src/main/webapp/js/nf/canvas/nf-canvas.js   |   2 +-
 .../js/nf/canvas/nf-controller-service.js       |   4 +-
 .../src/main/webapp/js/nf/nf-common.js          |  17 ++
 .../nifi/processors/kite/ConvertCSVToAvro.java  |  76 ++++++--
 .../nifi/processors/kite/ConvertJSONToAvro.java |  95 ++++++----
 .../nifi/processors/kite/FailureTracker.java    |  83 ++++++++
 .../processors/kite/TestCSVToAvroProcessor.java |  96 +++++++++-
 .../kite/TestJSONToAvroProcessor.java           | 101 +++++++++-
 .../nifi/processors/standard/ExtractText.java   |   4 +-
 .../apache/nifi/controller/MonitorMemory.java   |  14 +-
 58 files changed, 1923 insertions(+), 405 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eddc071b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eddc071b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eddc071b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
----------------------------------------------------------------------