You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2015/07/02 22:47:57 UTC
[01/10] incubator-nifi git commit: NIFI-724: Enable bulletins for
reporting tasks and controller services
Repository: incubator-nifi
Updated Branches:
refs/heads/develop a09180799 -> eddc071b8
NIFI-724: Enable bulletins for reporting tasks and controller services
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/f1adb8bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/f1adb8bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/f1adb8bf
Branch: refs/heads/develop
Commit: f1adb8bf034a8407dfdd655e441c74c10a61b18f
Parents: e767f5c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jun 24 14:03:34 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jun 24 14:03:34 2015 -0400
----------------------------------------------------------------------
.../org/apache/nifi/reporting/Bulletin.java | 11 ++-
.../apache/nifi/reporting/ComponentType.java | 58 ++++++++++++++
.../manager/impl/ClusteredReportingContext.java | 46 ++++++++++-
.../cluster/manager/impl/WebClusterManager.java | 80 ++++++++++++--------
.../org/apache/nifi/events/BulletinFactory.java | 30 ++++++--
.../org/apache/nifi/events/SystemBulletin.java | 2 +
.../nifi/logging/LogRepositoryFactory.java | 6 +-
.../apache/nifi/controller/FlowController.java | 35 ++++++---
.../service/ControllerServiceLoader.java | 11 +--
.../nifi/events/VolatileBulletinRepository.java | 7 +-
.../org/apache/nifi/jaxb/AdaptedBulletin.java | 10 +++
.../org/apache/nifi/jaxb/BulletinAdapter.java | 3 +-
.../logging/ControllerServiceLogObserver.java | 45 +++++++++++
.../nifi/logging/ReportingTaskLogObserver.java | 45 +++++++++++
.../nifi/remote/StandardRemoteProcessGroup.java | 16 ++--
.../nifi/remote/StandardRootGroupPort.java | 4 +-
16 files changed, 341 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
index 87443a3..fe370ae 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
@@ -34,6 +34,7 @@ public abstract class Bulletin implements Comparable<Bulletin> {
private String groupId;
private String sourceId;
private String sourceName;
+ private ComponentType sourceType;
protected Bulletin(final long id) {
this.timestamp = new Date();
@@ -104,9 +105,17 @@ public abstract class Bulletin implements Comparable<Bulletin> {
this.sourceName = sourceName;
}
+ public ComponentType getSourceType() {
+ return sourceType;
+ }
+
+ public void setSourceType(ComponentType sourceType) {
+ this.sourceType = sourceType;
+ }
+
@Override
public String toString() {
- return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + '}';
+ return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + ", sourceType=" + sourceType + '}';
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
new file mode 100644
index 0000000..97f3538
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting;
+
+/**
+ * An Enumeration for indicating which type of component a Bulletin is associated with
+ */
+public enum ComponentType {
+
+ /**
+ * Bulletin is associated with a Processor
+ */
+ PROCESSOR,
+
+ /**
+ * Bulletin is associated with a Remote Process Group
+ */
+ REMOTE_PROCESS_GROUP,
+
+ /**
+ * Bulletin is associated with an Input Port
+ */
+ INPUT_PORT,
+
+ /**
+ * Bulletin is associated with an Output Port
+ */
+ OUTPUT_PORT,
+
+ /**
+ * Bulletin is associated with a Reporting Task
+ */
+ REPORTING_TASK,
+
+ /**
+ * Bulletin is associated with a Controller Service
+ */
+ CONTROLLER_SERVICE,
+
+ /**
+ * Bulletin is a system-level bulletin, associated with the Flow Controller
+ */
+ FLOW_CONTROLLER;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
index e546f87..c6624cc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
@@ -24,15 +24,18 @@ import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.processor.StandardPropertyValue;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.Severity;
@@ -85,8 +88,9 @@ public class ClusteredReportingContext implements ReportingContext {
final ProcessGroupStatus rootGroupStatus = eventAccess.getControllerStatus();
final String groupId = findGroupId(rootGroupStatus, componentId);
final String componentName = findComponentName(rootGroupStatus, componentId);
+ final ComponentType componentType = findComponentType(rootGroupStatus, componentId);
- return BulletinFactory.createBulletin(groupId, componentId, componentName, category, severity.name(), message);
+ return BulletinFactory.createBulletin(groupId, componentId, componentType, componentName, category, severity.name(), message);
}
@Override
@@ -134,6 +138,46 @@ public class ClusteredReportingContext implements ReportingContext {
return null;
}
+ private ComponentType findComponentType(final ProcessGroupStatus groupStatus, final String componentId) {
+ for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) {
+ if (procStatus.getId().equals(componentId)) {
+ return ComponentType.PROCESSOR;
+ }
+ }
+
+ for (final PortStatus portStatus : groupStatus.getInputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return ComponentType.INPUT_PORT;
+ }
+ }
+
+ for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return ComponentType.OUTPUT_PORT;
+ }
+ }
+
+ for (final RemoteProcessGroupStatus remoteStatus : groupStatus.getRemoteProcessGroupStatus()) {
+ if (remoteStatus.getId().equals(componentId)) {
+ return ComponentType.REMOTE_PROCESS_GROUP;
+ }
+ }
+
+ for (final ProcessGroupStatus childGroup : groupStatus.getProcessGroupStatus()) {
+ final ComponentType type = findComponentType(childGroup, componentId);
+ if (type != null) {
+ return type;
+ }
+ }
+
+ final ControllerService service = serviceProvider.getControllerService(componentId);
+ if (service != null) {
+ return ComponentType.CONTROLLER_SERVICE;
+ }
+
+ return null;
+ }
+
private String findComponentName(final ProcessGroupStatus groupStatus, final String componentId) {
for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) {
if (procStatus.getId().equals(componentId)) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index d6ba6db..9edc83f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -129,6 +129,7 @@ import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.StandardFlowSerializer;
+import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
@@ -159,7 +160,12 @@ import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.framework.security.util.SslContextFactory;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.ControllerServiceLogObserver;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.logging.ReportingTaskLogObserver;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
@@ -929,7 +935,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
//optional properties for all ReportingTasks
for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "reportingTask")) {
//add global properties common to all tasks
- Map<String, String> properties = new HashMap<>();
+ final Map<String, String> properties = new HashMap<>();
//get properties for the specific reporting task - id, name, class,
//and schedulingPeriod must be set
@@ -1080,6 +1086,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
+ // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+ final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+ logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+ new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
+
return taskNode;
}
@@ -1368,7 +1379,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
- return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+ final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+
+ // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+ final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+ logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+ new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
+
+ return serviceNode;
}
@Override
@@ -1630,7 +1648,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort();
// unmarshal the message
- BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload());
+ final BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload());
for (final Bulletin bulletin : payload.getBulletins()) {
bulletin.setNodeAddress(nodeAddress);
bulletinRepository.addBulletin(bulletin);
@@ -1688,7 +1706,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final int numPendingHeartbeats = mostRecentHeartbeats.size();
if (heartbeatLogger.isDebugEnabled()) {
- heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, (numPendingHeartbeats > 1) ? "s" : ""));
+ heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, numPendingHeartbeats > 1 ? "s" : ""));
}
for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) {
@@ -2130,7 +2148,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
readLock.lock();
try {
final ClusterServicesBroadcaster broadcaster = this.servicesBroadcaster;
- return (broadcaster != null && broadcaster.isRunning());
+ return broadcaster != null && broadcaster.isRunning();
} finally {
readLock.unlock("isBroadcasting");
}
@@ -2323,7 +2341,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
if (auditService != null) {
try {
auditService.addActions(clusterContext.getActions());
- } catch (Throwable t) {
+ } catch (final Throwable t) {
logger.warn("Unable to record actions: " + t.getMessage());
if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, t);
@@ -2834,7 +2852,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ProcessorEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorEntity.class);
+ final ProcessorEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorEntity.class);
final ProcessorDTO nodeProcessor = nodeResponseEntity.getProcessor();
processorMap.put(nodeResponse.getNodeId(), nodeProcessor);
}
@@ -2851,7 +2869,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ProcessorsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
+ final ProcessorsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
final Set<ProcessorDTO> nodeProcessors = nodeResponseEntity.getProcessors();
for (final ProcessorDTO nodeProcessor : nodeProcessors) {
@@ -2892,7 +2910,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ProcessGroupEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
+ final ProcessGroupEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
final ProcessGroupDTO nodeProcessGroup = nodeResponseEntity.getProcessGroup();
for (final ProcessorDTO nodeProcessor : nodeProcessGroup.getContents().getProcessors()) {
@@ -2952,7 +2970,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final FlowSnippetEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
+ final FlowSnippetEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
final FlowSnippetDTO nodeContents = nodeResponseEntity.getContents();
for (final ProcessorDTO nodeProcessor : nodeContents.getProcessors()) {
@@ -2995,7 +3013,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// create a new client response
clientResponse = new NodeResponse(clientResponse, responseEntity);
- } else if (hasSuccessfulClientResponse && (isRemoteProcessGroupEndpoint(uri, method))) {
+ } else if (hasSuccessfulClientResponse && isRemoteProcessGroupEndpoint(uri, method)) {
final RemoteProcessGroupEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
final RemoteProcessGroupDTO remoteProcessGroup = responseEntity.getRemoteProcessGroup();
@@ -3005,7 +3023,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final RemoteProcessGroupEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
+ final RemoteProcessGroupEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
final RemoteProcessGroupDTO nodeRemoteProcessGroup = nodeResponseEntity.getRemoteProcessGroup();
remoteProcessGroupMap.put(nodeResponse.getNodeId(), nodeRemoteProcessGroup);
@@ -3013,7 +3031,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
mergeRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupMap);
clientResponse = new NodeResponse(clientResponse, responseEntity);
- } else if (hasSuccessfulClientResponse && (isRemoteProcessGroupsEndpoint(uri, method))) {
+ } else if (hasSuccessfulClientResponse && isRemoteProcessGroupsEndpoint(uri, method)) {
final RemoteProcessGroupsEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
final Set<RemoteProcessGroupDTO> remoteProcessGroups = responseEntity.getRemoteProcessGroups();
@@ -3023,7 +3041,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final RemoteProcessGroupsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
+ final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
final Set<RemoteProcessGroupDTO> nodeRemoteProcessGroups = nodeResponseEntity.getRemoteProcessGroups();
for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeRemoteProcessGroups) {
@@ -3056,7 +3074,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ProvenanceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class);
+ final ProvenanceEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class);
final ProvenanceDTO nodeQuery = nodeResponseEntity.getProvenance();
resultsMap.put(nodeResponse.getNodeId(), nodeQuery);
@@ -3084,7 +3102,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ControllerServiceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+ final ControllerServiceEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
final ControllerServiceDTO nodeControllerService = nodeResponseEntity.getControllerService();
resultsMap.put(nodeResponse.getNodeId(), nodeControllerService);
@@ -3102,7 +3120,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ControllerServicesEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+ final ControllerServicesEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
final Set<ControllerServiceDTO> nodeControllerServices = nodeResponseEntity.getControllerServices();
for (final ControllerServiceDTO nodeControllerService : nodeControllerServices) {
@@ -3136,7 +3154,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
final ControllerServiceReferencingComponentsEntity nodeResponseEntity =
- (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+ nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents();
resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
@@ -3154,7 +3172,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ReportingTaskEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+ final ReportingTaskEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
final ReportingTaskDTO nodeReportingTask = nodeResponseEntity.getReportingTask();
resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask);
@@ -3172,7 +3190,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ReportingTasksEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+ final ReportingTasksEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
final Set<ReportingTaskDTO> nodeReportingTasks = nodeResponseEntity.getReportingTasks();
for (final ReportingTaskDTO nodeReportingTask : nodeReportingTasks) {
@@ -3428,7 +3446,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
private boolean canChangeNodeState(final String method, final URI uri) {
- return (HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method));
+ return HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method);
}
private void notifyDataFlowManagementServiceOfNodeStatusChange() {
@@ -3477,7 +3495,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public void run() {
logger.info("Entering safe mode...");
final int safeModeSeconds = (int) FormatUtils.getTimeDuration(properties.getClusterManagerSafeModeDuration(), TimeUnit.SECONDS);
- final long timeToElect = (safeModeSeconds <= 0) ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
+ final long timeToElect = safeModeSeconds <= 0 ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
boolean exitSafeMode = false;
while (isRunning()) {
@@ -3819,7 +3837,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static Date normalizeStatusSnapshotDate(final Date toNormalize, final long numMillis) {
final long time = toNormalize.getTime();
- return new Date(time - (time % numMillis));
+ return new Date(time - time % numMillis);
}
private NodeDTO createNodeDTO(final Node node) {
@@ -3861,8 +3879,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
StatusHistoryDTO lastStatusHistory = null;
- Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>();
- Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+ final Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>();
+ final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
for (final Node node : getRawNodes()) {
final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
@@ -3942,8 +3960,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
StatusHistoryDTO lastStatusHistory = null;
- Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>();
- Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+ final Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>();
+ final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
for (final Node node : getRawNodes()) {
final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
@@ -4006,8 +4024,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
StatusHistoryDTO lastStatusHistory = null;
- Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>();
- Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+ final Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>();
+ final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
for (final Node node : getRawNodes()) {
final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
@@ -4070,8 +4088,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
StatusHistoryDTO lastStatusHistory = null;
- Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>();
- Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+ final Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>();
+ final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
for (final Node node : getRawNodes()) {
final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
index d1d5e5b..4795827 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
@@ -17,24 +17,43 @@
package org.apache.nifi.events;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.ComponentType;
-/**
- *
- */
public final class BulletinFactory {
private static final AtomicLong currentId = new AtomicLong(0);
public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message) {
- return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), connectable.getName(), category, severity, message);
+ final ComponentType type;
+ switch (connectable.getConnectableType()) {
+ case REMOTE_INPUT_PORT:
+ case REMOTE_OUTPUT_PORT:
+ type = ComponentType.REMOTE_PROCESS_GROUP;
+ break;
+ case INPUT_PORT:
+ type = ComponentType.INPUT_PORT;
+ break;
+ case OUTPUT_PORT:
+ type = ComponentType.OUTPUT_PORT;
+ break;
+ case PROCESSOR:
+ default:
+ type = ComponentType.PROCESSOR;
+ break;
+ }
+
+ return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), type, connectable.getName(), category, severity, message);
}
- public static Bulletin createBulletin(final String groupId, final String sourceId, final String sourceName, final String category, final String severity, final String message) {
+ public static Bulletin createBulletin(final String groupId, final String sourceId, final ComponentType sourceType, final String sourceName,
+ final String category, final String severity, final String message) {
final Bulletin bulletin = new ComponentBulletin(currentId.getAndIncrement());
bulletin.setGroupId(groupId);
bulletin.setSourceId(sourceId);
+ bulletin.setSourceType(sourceType);
bulletin.setSourceName(sourceName);
bulletin.setCategory(category);
bulletin.setLevel(severity);
@@ -47,6 +66,7 @@ public final class BulletinFactory {
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
+ bulletin.setSourceType(ComponentType.FLOW_CONTROLLER);
return bulletin;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
index f97dc46..3359e7e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
@@ -17,6 +17,7 @@
package org.apache.nifi.events;
import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.ComponentType;
/**
*
@@ -25,6 +26,7 @@ public class SystemBulletin extends Bulletin {
SystemBulletin(final long id) {
super(id);
+ setSourceType(ComponentType.FLOW_CONTROLLER);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
index 76ca661..d7fa3fc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
@@ -41,8 +41,8 @@ public class LogRepositoryFactory {
logRepositoryClass = clazz;
}
- public static LogRepository getRepository(final String processorId) {
- LogRepository repository = repositoryMap.get(requireNonNull(processorId));
+ public static LogRepository getRepository(final String componentId) {
+ LogRepository repository = repositoryMap.get(requireNonNull(componentId));
if (repository == null) {
try {
repository = logRepositoryClass.newInstance();
@@ -50,7 +50,7 @@ public class LogRepositoryFactory {
throw new RuntimeException(e);
}
- final LogRepository oldRepository = repositoryMap.putIfAbsent(processorId, repository);
+ final LogRepository oldRepository = repositoryMap.putIfAbsent(componentId, repository);
if (oldRepository != null) {
repository = oldRepository;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2ffdd4e..b6edbbb 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -137,11 +137,13 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.groups.StandardProcessGroup;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.ControllerServiceLogObserver;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.logging.ProcessorLogObserver;
+import org.apache.nifi.logging.ReportingTaskLogObserver;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
@@ -593,7 +595,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
writeLock.lock();
try {
if (startDelayedComponents) {
- LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()));
+ LOG.info("Starting {} processors/ports/funnels", startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size());
for (final Connectable connectable : startConnectablesAfterInitialization) {
if (connectable.getScheduledState() == ScheduledState.DISABLED) {
continue;
@@ -1012,7 +1014,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public boolean isTerminated() {
this.readLock.lock();
try {
- return (null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated());
+ return null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated();
} finally {
this.readLock.unlock();
}
@@ -1828,9 +1830,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
} else if (id1.equals(id2)) {
return true;
} else {
- final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1);
- final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2);
- return (comparable1.equals(comparable2));
+ final String comparable1 = id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1;
+ final String comparable2 = id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2;
+ return comparable1.equals(comparable2);
}
}
@@ -1964,7 +1966,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id;
- return (root == null) ? null : root.findProcessGroup(searchId);
+ return root == null ? null : root.findProcessGroup(searchId);
}
@Override
@@ -2079,8 +2081,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut());
connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut());
- flowFilesTransferred += (connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut());
- bytesTransferred += (connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut());
+ flowFilesTransferred += connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut();
+ bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut();
}
if (StringUtils.isNotBlank(conn.getName())) {
@@ -2552,6 +2554,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
reportingTasks.put(id, taskNode);
+
+ // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+ final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+ logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+ new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
+
return taskNode;
}
@@ -2616,7 +2624,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
- return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+ final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+
+ // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+ final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+ logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+ new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
+
+ return serviceNode;
}
@Override
@@ -3480,7 +3495,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (bulletin.getGroupId() == null) {
escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
} else {
- escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(),
+ escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(),
bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 92fa3b2..b5c3855 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -48,11 +48,12 @@ public class ControllerServiceLoader {
private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class);
public static List<ControllerServiceNode> loadControllerServices(
- final ControllerServiceProvider provider,
- final InputStream serializedStream,
- final StringEncryptor encryptor,
- final BulletinRepository bulletinRepo,
- final boolean autoResumeState) throws IOException {
+ final ControllerServiceProvider provider,
+ final InputStream serializedStream,
+ final StringEncryptor encryptor,
+ final BulletinRepository bulletinRepo,
+ final boolean autoResumeState) throws IOException {
+
final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
documentBuilderFactory.setNamespaceAware(true);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index a20e974..a58bf8e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.RingBuffer.Filter;
@@ -167,7 +168,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
}
final RingBuffer<Bulletin> buffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
- return (buffer == null) ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() {
+ return buffer == null ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() {
@Override
public boolean select(final Bulletin bulletin) {
return bulletin.getTimestamp().getTime() >= fiveMinutesAgo;
@@ -199,7 +200,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(groupId);
if (componentMap == null) {
componentMap = new ConcurrentHashMap<>();
- ConcurrentMap<String, RingBuffer<Bulletin>> existing = bulletinStoreMap.putIfAbsent(groupId, componentMap);
+ final ConcurrentMap<String, RingBuffer<Bulletin>> existing = bulletinStoreMap.putIfAbsent(groupId, componentMap);
if (existing != null) {
componentMap = existing;
}
@@ -225,7 +226,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
}
private boolean isControllerBulletin(final Bulletin bulletin) {
- return bulletin.getGroupId() == null;
+ return ComponentType.FLOW_CONTROLLER.equals(bulletin.getSourceType());
}
private class DefaultBulletinProcessingStrategy implements BulletinProcessingStrategy {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
index 6f1dc45..17c5991 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
@@ -18,6 +18,8 @@ package org.apache.nifi.jaxb;
import java.util.Date;
+import org.apache.nifi.reporting.ComponentType;
+
/**
*
*/
@@ -32,6 +34,7 @@ public class AdaptedBulletin {
private String groupId;
private String sourceId;
private String sourceName;
+ private ComponentType sourceType;
public String getCategory() {
return category;
@@ -97,4 +100,11 @@ public class AdaptedBulletin {
this.timestamp = timestamp;
}
+ public ComponentType getSourceType() {
+ return sourceType;
+ }
+
+ public void setSourceType(ComponentType sourceType) {
+ this.sourceType = sourceType;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
index b699348..acbe0dd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
@@ -34,7 +34,7 @@ public class BulletinAdapter extends XmlAdapter<AdaptedBulletin, Bulletin> {
if (b.getSourceId() == null) {
return BulletinFactory.createBulletin(b.getCategory(), b.getLevel(), b.getMessage());
} else {
- return BulletinFactory.createBulletin(b.getGroupId(), b.getSourceId(), b.getSourceName(), b.getCategory(), b.getLevel(), b.getMessage());
+ return BulletinFactory.createBulletin(b.getGroupId(), b.getSourceId(), b.getSourceType(), b.getSourceName(), b.getCategory(), b.getLevel(), b.getMessage());
}
}
@@ -48,6 +48,7 @@ public class BulletinAdapter extends XmlAdapter<AdaptedBulletin, Bulletin> {
aBulletin.setTimestamp(b.getTimestamp());
aBulletin.setGroupId(b.getGroupId());
aBulletin.setSourceId(b.getSourceId());
+ aBulletin.setSourceType(b.getSourceType());
aBulletin.setSourceName(b.getSourceName());
aBulletin.setCategory(b.getCategory());
aBulletin.setLevel(b.getLevel());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
new file mode 100644
index 0000000..837e1c4
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.reporting.Severity;
+
+public class ControllerServiceLogObserver implements LogObserver {
+ private final BulletinRepository bulletinRepository;
+ private final ControllerServiceNode serviceNode;
+
+ public ControllerServiceLogObserver(final BulletinRepository bulletinRepository, final ControllerServiceNode serviceNode) {
+ this.bulletinRepository = bulletinRepository;
+ this.serviceNode = serviceNode;
+ }
+
+ @Override
+ public void onLogMessage(final LogMessage message) {
+ // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
+ // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
+ final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString();
+
+ final Bulletin bulletin = BulletinFactory.createBulletin(null, serviceNode.getIdentifier(), ComponentType.REPORTING_TASK,
+ serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage());
+ bulletinRepository.addBulletin(bulletin);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
new file mode 100644
index 0000000..e5638d6
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.reporting.Severity;
+
+public class ReportingTaskLogObserver implements LogObserver {
+ private final BulletinRepository bulletinRepository;
+ private final ReportingTaskNode taskNode;
+
+ public ReportingTaskLogObserver(final BulletinRepository bulletinRepository, final ReportingTaskNode taskNode) {
+ this.bulletinRepository = bulletinRepository;
+ this.taskNode = taskNode;
+ }
+
+ @Override
+ public void onLogMessage(final LogMessage message) {
+ // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
+ // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
+ final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString();
+
+ final Bulletin bulletin = BulletinFactory.createBulletin(null, taskNode.getIdentifier(), ComponentType.REPORTING_TASK,
+ taskNode.getName(), "Log Message", bulletinLevel, message.getMessage());
+ bulletinRepository.addBulletin(bulletin);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index d19b5c1..61516d0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -57,6 +57,7 @@ import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
@@ -164,7 +165,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
final String groupId = StandardRemoteProcessGroup.this.getProcessGroup().getIdentifier();
final String sourceId = StandardRemoteProcessGroup.this.getIdentifier();
final String sourceName = StandardRemoteProcessGroup.this.getName();
- bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message));
+ bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, ComponentType.REMOTE_PROCESS_GROUP,
+ sourceName, category, severity.name(), message));
}
};
@@ -227,7 +229,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public String getName() {
final String name = this.name.get();
- return (name == null) ? targetUri.toString() : name;
+ return name == null ? targetUri.toString() : name;
}
@Override
@@ -671,7 +673,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private ProcessGroup getRootGroup(final ProcessGroup context) {
final ProcessGroup parent = context.getParent();
- return (parent == null) ? context : getRootGroup(parent);
+ return parent == null ? context : getRootGroup(parent);
}
private boolean isWebApiSecure() {
@@ -714,7 +716,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public Date getLastRefreshTime() {
readLock.lock();
try {
- return (refreshContentsTimestamp == null) ? null : new Date(refreshContentsTimestamp);
+ return refreshContentsTimestamp == null ? null : new Date(refreshContentsTimestamp);
} finally {
readLock.unlock();
}
@@ -855,7 +857,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
if (ports != null) {
remotePorts = new LinkedHashSet<>(ports.size());
- for (PortDTO port : ports) {
+ for (final PortDTO port : ports) {
final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
final ScheduledState scheduledState = ScheduledState.valueOf(port.getState());
descriptor.setId(port.getId());
@@ -1093,7 +1095,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
final String remoteInstanceId = dto.getInstanceId();
- boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId);
+ final boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId);
pointsToCluster.set(isPointingToCluster);
} else if (statusCode == UNAUTHORIZED_STATUS_CODE) {
try {
@@ -1120,7 +1122,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
new Object[]{this, response.getStatus(), response.getStatusInfo().getReasonPhrase(), message});
authorizationIssue = "Unable to determine Site-to-Site availability.";
}
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e));
getEventReporter().reportEvent(Severity.WARNING, "Site to Site", String.format("Unable to connect to %s due to %s",
StandardRemoteProcessGroup.this.getTargetUri().toString(), e));
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/f1adb8bf/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 4bb1683..9eadec0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -61,6 +61,7 @@ import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.ServerProtocol;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.user.NiFiUser;
@@ -108,7 +109,8 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
final String groupId = StandardRootGroupPort.this.getProcessGroup().getIdentifier();
final String sourceId = StandardRootGroupPort.this.getIdentifier();
final String sourceName = StandardRootGroupPort.this.getName();
- bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message));
+ final ComponentType componentType = direction == TransferDirection.RECEIVE ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT;
+ bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, componentType, sourceName, category, severity.name(), message));
}
};
[06/10] incubator-nifi git commit: NIFI-724: Ensure that bulletins
generated for reporting tasks and controller services are shown at Controller
level as well as component level
Posted by mc...@apache.org.
NIFI-724: Ensure that bulletins generated for reporting tasks and controller services are shown at Controller level as well as component level
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e7c0461b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e7c0461b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e7c0461b
Branch: refs/heads/develop
Commit: e7c0461b15bff045d68e7ae8814eda2073cba209
Parents: 59aa8ff
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jul 1 12:54:18 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Jul 1 12:54:18 2015 -0400
----------------------------------------------------------------------
.../nifi/events/VolatileBulletinRepository.java | 105 ++++++++++++++-----
1 file changed, 78 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e7c0461b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index c18fffd..8aeb34d 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -36,6 +36,8 @@ public class VolatileBulletinRepository implements BulletinRepository {
private static final int CONTROLLER_BUFFER_SIZE = 10;
private static final int COMPONENT_BUFFER_SIZE = 5;
private static final String CONTROLLER_BULLETIN_STORE_KEY = "CONTROLLER";
+ private static final String SERVICE_BULLETIN_STORE_KEY = "SERVICE";
+ private static final String REPORTING_TASK_BULLETIN_STORE_KEY = "REPORTING_TASK";
private final ConcurrentMap<String, ConcurrentMap<String, RingBuffer<Bulletin>>> bulletinStoreMap = new ConcurrentHashMap<>();
private volatile BulletinProcessingStrategy processingStrategy = new DefaultBulletinProcessingStrategy();
@@ -170,18 +172,39 @@ public class VolatileBulletinRepository implements BulletinRepository {
public List<Bulletin> findBulletinsForController(final int max) {
final long fiveMinutesAgo = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(5);
- final ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(CONTROLLER_BULLETIN_STORE_KEY);
- if (componentMap == null) {
- return Collections.<Bulletin>emptyList();
- }
-
- final RingBuffer<Bulletin> buffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
- return buffer == null ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() {
+ final Filter<Bulletin> filter = new Filter<Bulletin>() {
@Override
public boolean select(final Bulletin bulletin) {
return bulletin.getTimestamp().getTime() >= fiveMinutesAgo;
}
- }, max);
+ };
+
+ final List<Bulletin> controllerBulletins = new ArrayList<>();
+
+ final ConcurrentMap<String, RingBuffer<Bulletin>> controllerBulletinMap = bulletinStoreMap.get(CONTROLLER_BULLETIN_STORE_KEY);
+ if (controllerBulletinMap != null) {
+ final RingBuffer<Bulletin> buffer = controllerBulletinMap.get(CONTROLLER_BULLETIN_STORE_KEY);
+ if (buffer != null) {
+ controllerBulletins.addAll(buffer.getSelectedElements(filter, max));
+ }
+ }
+
+ for (final String key : new String[] { SERVICE_BULLETIN_STORE_KEY, REPORTING_TASK_BULLETIN_STORE_KEY }) {
+ final ConcurrentMap<String, RingBuffer<Bulletin>> bulletinMap = bulletinStoreMap.get(key);
+ if (bulletinMap != null) {
+ for (final RingBuffer<Bulletin> buffer : bulletinMap.values()) {
+ controllerBulletins.addAll(buffer.getSelectedElements(filter, max));
+ }
+ }
+ }
+
+ // We only want the newest bulletin, so we sort based on time and take the top 'max' entries
+ Collections.sort(controllerBulletins);
+ if (controllerBulletins.size() > max) {
+ return controllerBulletins.subList(0, max);
+ }
+
+ return controllerBulletins;
}
/**
@@ -203,7 +226,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
this.processingStrategy = new DefaultBulletinProcessingStrategy();
}
- private RingBuffer<Bulletin> getBulletinBuffer(final Bulletin bulletin) {
+ private List<RingBuffer<Bulletin>> getBulletinBuffers(final Bulletin bulletin) {
final String storageKey = getBulletinStoreKey(bulletin);
ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(storageKey);
@@ -215,40 +238,68 @@ public class VolatileBulletinRepository implements BulletinRepository {
}
}
- final boolean controllerBulletin = isControllerBulletin(bulletin);
- final String sourceId = controllerBulletin ? CONTROLLER_BULLETIN_STORE_KEY : bulletin.getSourceId();
- RingBuffer<Bulletin> bulletinBuffer = componentMap.get(sourceId);
- if (bulletinBuffer == null) {
- final int bufferSize = controllerBulletin ? CONTROLLER_BUFFER_SIZE : COMPONENT_BUFFER_SIZE;
- bulletinBuffer = new RingBuffer<>(bufferSize);
- final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(sourceId, bulletinBuffer);
- if (existingBuffer != null) {
- bulletinBuffer = existingBuffer;
+ final List<RingBuffer<Bulletin>> buffers = new ArrayList<>(2);
+
+ if (isControllerBulletin(bulletin)) {
+ RingBuffer<Bulletin> bulletinBuffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
+ if (bulletinBuffer == null) {
+ bulletinBuffer = new RingBuffer<>(CONTROLLER_BUFFER_SIZE);
+ final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(CONTROLLER_BULLETIN_STORE_KEY, bulletinBuffer);
+ if (existingBuffer != null) {
+ bulletinBuffer = existingBuffer;
+ }
}
+
+ buffers.add(bulletinBuffer);
}
- return bulletinBuffer;
+ if (bulletin.getSourceType() != ComponentType.FLOW_CONTROLLER) {
+ RingBuffer<Bulletin> bulletinBuffer = componentMap.get(bulletin.getSourceId());
+ if (bulletinBuffer == null) {
+ bulletinBuffer = new RingBuffer<>(COMPONENT_BUFFER_SIZE);
+ final RingBuffer<Bulletin> existingBuffer = componentMap.putIfAbsent(bulletin.getSourceId(), bulletinBuffer);
+ if (existingBuffer != null) {
+ bulletinBuffer = existingBuffer;
+ }
+ }
+
+ buffers.add(bulletinBuffer);
+ }
+
+ return buffers;
}
private String getBulletinStoreKey(final Bulletin bulletin) {
- if (isControllerBulletin(bulletin)) {
- return CONTROLLER_BULLETIN_STORE_KEY;
+ switch (bulletin.getSourceType()) {
+ case FLOW_CONTROLLER:
+ return CONTROLLER_BULLETIN_STORE_KEY;
+ case CONTROLLER_SERVICE:
+ return SERVICE_BULLETIN_STORE_KEY;
+ case REPORTING_TASK:
+ return REPORTING_TASK_BULLETIN_STORE_KEY;
+ default:
+ return bulletin.getGroupId();
}
-
- final String groupId = bulletin.getGroupId();
- return groupId == null ? bulletin.getSourceId() : groupId;
}
private boolean isControllerBulletin(final Bulletin bulletin) {
- return ComponentType.FLOW_CONTROLLER.equals(bulletin.getSourceType());
+ switch (bulletin.getSourceType()) {
+ case FLOW_CONTROLLER:
+ case CONTROLLER_SERVICE:
+ case REPORTING_TASK:
+ return true;
+ default:
+ return false;
+ }
}
private class DefaultBulletinProcessingStrategy implements BulletinProcessingStrategy {
@Override
public void update(final Bulletin bulletin) {
- final RingBuffer<Bulletin> bulletinBuffer = getBulletinBuffer(bulletin);
- bulletinBuffer.add(bulletin);
+ for (final RingBuffer<Bulletin> bulletinBuffer : getBulletinBuffers(bulletin)) {
+ bulletinBuffer.add(bulletin);
+ }
}
}
}
[03/10] incubator-nifi git commit: NIFI-724: merged changes
Posted by mc...@apache.org.
NIFI-724: merged changes
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/0a7ab1a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/0a7ab1a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/0a7ab1a0
Branch: refs/heads/develop
Commit: 0a7ab1a06dd8b7164413bdcd0a85a4c457d3da71
Parents: e240e07 f1adb8b
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Jun 25 10:58:01 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jun 25 10:58:01 2015 -0400
----------------------------------------------------------------------
----------------------------------------------------------------------
[08/10] incubator-nifi git commit: NIFI-694: - Ensuring bulletins are
not lost when updating a reporting task.
Posted by mc...@apache.org.
NIFI-694:
- Ensuring bulletins are not lost when updating a reporting task.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/c3dff409
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/c3dff409
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/c3dff409
Branch: refs/heads/develop
Commit: c3dff409024d00208c68323ad1989bd2b93b7c0c
Parents: 4fd1e94
Author: Matt Gilman <ma...@gmail.com>
Authored: Thu Jul 2 10:22:42 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Jul 2 10:22:42 2015 -0400
----------------------------------------------------------------------
.../src/main/webapp/js/nf/canvas/nf-reporting-task.js | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/c3dff409/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js
index 998213e..da78e82 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-reporting-task.js
@@ -177,7 +177,10 @@ nf.ReportingTask = (function () {
// get the table and update the row accordingly
var reportingTaskGrid = $('#reporting-tasks-table').data('gridInstance');
var reportingTaskData = reportingTaskGrid.getData();
- reportingTaskData.updateItem(reportingTask.id, reportingTask);
+ var currentReportingTask = reportingTaskData.getItemById(reportingTask.id);
+ reportingTaskData.updateItem(reportingTask.id, $.extend({
+ bulletins: currentReportingTask.bulletins
+ }, reportingTask));
};
/**
[05/10] incubator-nifi git commit: NIFI-694: - Showing bulletins in
the controller service and reporting task tables. - Fixed issue typo when
emitting controller service bulletins.
Posted by mc...@apache.org.
NIFI-694:
- Showing bulletins in the controller service and reporting task tables.
- Fixed issue typo when emitting controller service bulletins.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/59aa8ffe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/59aa8ffe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/59aa8ffe
Branch: refs/heads/develop
Commit: 59aa8ffe100645498fdd4cea2214a203e3a3d433
Parents: 7a28903
Author: Matt Gilman <ma...@gmail.com>
Authored: Tue Jun 30 14:14:57 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Tue Jun 30 14:14:57 2015 -0400
----------------------------------------------------------------------
.../apache/nifi/reporting/BulletinQuery.java | 12 ++
.../web/api/dto/status/ControllerStatusDTO.java | 33 +++-
.../nifi/events/VolatileBulletinRepository.java | 13 +-
.../logging/ControllerServiceLogObserver.java | 5 +-
.../nifi/web/StandardNiFiServiceFacade.java | 19 +-
.../org/apache/nifi/web/api/dto/DtoFactory.java | 22 ++-
.../nifi/web/controller/ControllerFacade.java | 43 ++--
.../src/main/webapp/css/controller-service.css | 9 +-
.../src/main/webapp/js/nf/canvas/nf-canvas.js | 3 +
.../js/nf/canvas/nf-controller-service.js | 9 +-
.../src/main/webapp/js/nf/canvas/nf-settings.js | 194 +++++++++++++++++--
11 files changed, 313 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java
index cb5d7b3..7ba2089 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/BulletinQuery.java
@@ -23,6 +23,7 @@ import java.util.regex.Pattern;
*/
public class BulletinQuery {
+ private final ComponentType sourceType;
private final Pattern sourceIdPattern;
private final Pattern groupIdPattern;
private final Pattern namePattern;
@@ -31,6 +32,7 @@ public class BulletinQuery {
private final Integer limit;
private BulletinQuery(final Builder builder) {
+ this.sourceType = builder.sourceType;
this.sourceIdPattern = builder.sourceIdPattern == null ? null : Pattern.compile(builder.sourceIdPattern);
this.groupIdPattern = builder.groupIdPattern == null ? null : Pattern.compile(builder.groupIdPattern);
this.namePattern = builder.namePattern == null ? null : Pattern.compile(builder.namePattern);
@@ -39,6 +41,10 @@ public class BulletinQuery {
this.limit = builder.limit;
}
+ public ComponentType getSourceType() {
+ return sourceType;
+ }
+
public Pattern getSourceIdPattern() {
return sourceIdPattern;
}
@@ -65,6 +71,7 @@ public class BulletinQuery {
public static class Builder {
+ private ComponentType sourceType;
private String sourceIdPattern;
private String groupIdPattern;
private String namePattern;
@@ -87,6 +94,11 @@ public class BulletinQuery {
return this;
}
+ public Builder sourceType(ComponentType sourceType) {
+ this.sourceType = sourceType;
+ return this;
+ }
+
public Builder nameMatches(String name) {
this.namePattern = name;
return this;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
index 7afc7bc..5d5eddf 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ControllerStatusDTO.java
@@ -40,6 +40,8 @@ public class ControllerStatusDTO {
private Integer inactiveRemotePortCount;
private List<BulletinDTO> bulletins;
+ private List<BulletinDTO> controllerServiceBulletins;
+ private List<BulletinDTO> reportingTaskBulletins;
/**
* The active thread count.
@@ -72,7 +74,8 @@ public class ControllerStatusDTO {
}
/**
- * @return Used in clustering, will report the number of nodes connected vs the number of nodes in the cluster
+ * @return Used in clustering, will report the number of nodes connected vs
+ * the number of nodes in the cluster
*/
@ApiModelProperty(
value = "When clustered, reports the number of nodes connected vs the number of nodes in the cluster."
@@ -100,6 +103,34 @@ public class ControllerStatusDTO {
}
/**
+ * @return Controller service bulletins to be reported to the user
+ */
+ @ApiModelProperty(
+ value = "Controller service bulletins to be reported to the user."
+ )
+ public List<BulletinDTO> getControllerServiceBulletins() {
+ return controllerServiceBulletins;
+ }
+
+ public void setControllerServiceBulletins(List<BulletinDTO> controllerServiceBulletins) {
+ this.controllerServiceBulletins = controllerServiceBulletins;
+ }
+
+ /**
+ * @return Reporting task bulletins to be reported to the user
+ */
+ @ApiModelProperty(
+ value = "Reporting task bulletins to be reported to the user."
+ )
+ public List<BulletinDTO> getReportingTaskBulletins() {
+ return reportingTaskBulletins;
+ }
+
+ public void setReportingTaskBulletins(List<BulletinDTO> reportingTaskBulletins) {
+ this.reportingTaskBulletins = reportingTaskBulletins;
+ }
+
+ /**
* @return whether or not there are pending user requests
*/
@ApiModelProperty(
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index 5172d34..c18fffd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -102,6 +102,14 @@ public class VolatileBulletinRepository implements BulletinRepository {
}
}
+ // if a source component type was specified see if it should be excluded
+ if (bulletinQuery.getSourceType() != null) {
+ // exclude if this bulletin source type doesn't match
+ if (bulletin.getSourceType() == null || !bulletinQuery.getSourceType().equals(bulletin.getSourceType())) {
+ return false;
+ }
+ }
+
return true;
}
};
@@ -177,8 +185,9 @@ public class VolatileBulletinRepository implements BulletinRepository {
}
/**
- * Overrides the default bulletin processing strategy. When a custom bulletin strategy is employed, bulletins will not be persisted in this repository and will sent to the specified strategy
- * instead.
+ * Overrides the default bulletin processing strategy. When a custom
+ * bulletin strategy is employed, bulletins will not be persisted in this
+ * repository and will sent to the specified strategy instead.
*
* @param strategy bulletin strategy
*/
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
index 837e1c4..d9eaa12 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
@@ -24,6 +24,7 @@ import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.Severity;
public class ControllerServiceLogObserver implements LogObserver {
+
private final BulletinRepository bulletinRepository;
private final ControllerServiceNode serviceNode;
@@ -38,8 +39,8 @@ public class ControllerServiceLogObserver implements LogObserver {
// the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString();
- final Bulletin bulletin = BulletinFactory.createBulletin(null, serviceNode.getIdentifier(), ComponentType.REPORTING_TASK,
- serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage());
+ final Bulletin bulletin = BulletinFactory.createBulletin(null, serviceNode.getIdentifier(), ComponentType.CONTROLLER_SERVICE,
+ serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage());
bulletinRepository.addBulletin(bulletin);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 903b872..4e50ff3 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -159,6 +159,7 @@ import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceReference;
import org.apache.nifi.controller.service.ControllerServiceState;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.web.api.dto.ControllerServiceDTO;
import org.apache.nifi.web.api.dto.ControllerServiceReferencingComponentDTO;
import org.apache.nifi.web.api.dto.PropertyDescriptorDTO;
@@ -1578,12 +1579,15 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
// get the bulletins for the controller
final BulletinRepository bulletinRepository = clusterManager.getBulletinRepository();
- final List<Bulletin> results = bulletinRepository.findBulletinsForController();
- final List<BulletinDTO> bulletinDtos = new ArrayList<>(results.size());
- for (final Bulletin bulletin : results) {
- bulletinDtos.add(dtoFactory.createBulletinDto(bulletin));
- }
- controllerStatus.setBulletins(bulletinDtos);
+ controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()));
+
+ // get the controller service bulletins
+ final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
+ controllerStatus.setControllerServiceBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(controllerServiceQuery)));
+
+ // get the reporting task bulletins
+ final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
+ controllerStatus.setReportingTaskBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(reportingTaskQuery)));
// get the component counts by extracting them from the roots' group status
final ProcessGroupStatus status = clusterManager.getProcessGroupStatus("root");
@@ -3019,7 +3023,8 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
}
/**
- * Utility method for extracting component counts from the specified group status.
+ * Utility method for extracting component counts from the specified group
+ * status.
*/
private ProcessGroupCounts extractProcessGroupCounts(ProcessGroupStatus groupStatus) {
int running = 0;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 54f11fb..76bce6f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -1487,6 +1487,20 @@ public final class DtoFactory {
}
/**
+ * Creates BulletinDTOs for the specified Bulletins.
+ *
+ * @param bulletins bulletin
+ * @return dto
+ */
+ public List<BulletinDTO> createBulletinDtos(final List<Bulletin> bulletins) {
+ final List<BulletinDTO> bulletinDtos = new ArrayList<>(bulletins.size());
+ for (final Bulletin bulletin : bulletins) {
+ bulletinDtos.add(createBulletinDto(bulletin));
+ }
+ return bulletinDtos;
+ }
+
+ /**
* Creates a BulletinDTO for the specified Bulletin.
*
* @param bulletin bulletin
@@ -1507,7 +1521,8 @@ public final class DtoFactory {
}
/**
- * Creates a ProvenanceEventNodeDTO for the specified ProvenanceEventLineageNode.
+ * Creates a ProvenanceEventNodeDTO for the specified
+ * ProvenanceEventLineageNode.
*
* @param node node
* @return dto
@@ -2158,8 +2173,9 @@ public final class DtoFactory {
/**
*
* @param original orig
- * @param deep if <code>true</code>, all Connections, ProcessGroups, Ports, Processors, etc. will be copied. If <code>false</code>, the copy will have links to the same objects referenced by
- * <code>original</code>.
+ * @param deep if <code>true</code>, all Connections, ProcessGroups, Ports,
+ * Processors, etc. will be copied. If <code>false</code>, the copy will
+ * have links to the same objects referenced by <code>original</code>.
*
* @return dto
*/
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index b614f0a..8bf5553 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -75,7 +75,6 @@ import org.apache.nifi.provenance.search.SearchTerm;
import org.apache.nifi.provenance.search.SearchTerms;
import org.apache.nifi.provenance.search.SearchableField;
import org.apache.nifi.remote.RootGroupPort;
-import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.ReportingTask;
import org.apache.nifi.scheduling.SchedulingStrategy;
@@ -88,7 +87,6 @@ import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.web.NiFiCoreException;
import org.apache.nifi.web.ResourceNotFoundException;
-import org.apache.nifi.web.api.dto.BulletinDTO;
import org.apache.nifi.web.api.dto.DocumentedTypeDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.provenance.AttributeDTO;
@@ -113,6 +111,8 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.authorization.DownloadAuthorization;
import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.reporting.BulletinQuery;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.web.security.user.NiFiUserUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -441,12 +441,15 @@ public class ControllerFacade {
controllerStatus.setQueued(FormatUtils.formatCount(controllerQueueSize.getObjectCount()) + " / " + FormatUtils.formatDataSize(controllerQueueSize.getByteCount()));
final BulletinRepository bulletinRepository = getBulletinRepository();
- final List<Bulletin> results = bulletinRepository.findBulletinsForController();
- final List<BulletinDTO> bulletinDtos = new ArrayList<>(results.size());
- for (final Bulletin bulletin : results) {
- bulletinDtos.add(dtoFactory.createBulletinDto(bulletin));
- }
- controllerStatus.setBulletins(bulletinDtos);
+ controllerStatus.setBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletinsForController()));
+
+ // get the controller service bulletins
+ final BulletinQuery controllerServiceQuery = new BulletinQuery.Builder().sourceType(ComponentType.CONTROLLER_SERVICE).build();
+ controllerStatus.setControllerServiceBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(controllerServiceQuery)));
+
+ // get the reporting task bulletins
+ final BulletinQuery reportingTaskQuery = new BulletinQuery.Builder().sourceType(ComponentType.REPORTING_TASK).build();
+ controllerStatus.setReportingTaskBulletins(dtoFactory.createBulletinDtos(bulletinRepository.findBulletins(reportingTaskQuery)));
final ProcessGroupCounts counts = rootGroup.getCounts();
controllerStatus.setRunningCount(counts.getRunningCount());
@@ -494,36 +497,44 @@ public class ControllerFacade {
}
/**
- * Returns the socket port that the Cluster Manager is listening on for Site-to-Site communications
+ * Returns the socket port that the Cluster Manager is listening on for
+ * Site-to-Site communications
*
- * @return the socket port that the Cluster Manager is listening on for Site-to-Site communications
+ * @return the socket port that the Cluster Manager is listening on for
+ * Site-to-Site communications
*/
public Integer getClusterManagerRemoteSiteListeningPort() {
return flowController.getClusterManagerRemoteSiteListeningPort();
}
/**
- * Indicates whether or not Site-to-Site communications with the Cluster Manager are secure
+ * Indicates whether or not Site-to-Site communications with the Cluster
+ * Manager are secure
*
- * @return whether or not Site-to-Site communications with the Cluster Manager are secure
+ * @return whether or not Site-to-Site communications with the Cluster
+ * Manager are secure
*/
public Boolean isClusterManagerRemoteSiteCommsSecure() {
return flowController.isClusterManagerRemoteSiteCommsSecure();
}
/**
- * Returns the socket port that the local instance is listening on for Site-to-Site communications
+ * Returns the socket port that the local instance is listening on for
+ * Site-to-Site communications
*
- * @return the socket port that the local instance is listening on for Site-to-Site communications
+ * @return the socket port that the local instance is listening on for
+ * Site-to-Site communications
*/
public Integer getRemoteSiteListeningPort() {
return flowController.getRemoteSiteListeningPort();
}
/**
- * Indicates whether or not Site-to-Site communications with the local instance are secure
+ * Indicates whether or not Site-to-Site communications with the local
+ * instance are secure
*
- * @return whether or not Site-to-Site communications with the local instance are secure
+ * @return whether or not Site-to-Site communications with the local
+ * instance are secure
*/
public Boolean isRemoteSiteCommsSecure() {
return flowController.isRemoteSiteCommsSecure();
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
index 4a82810..9d87994 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
@@ -133,9 +133,12 @@ div.referencing-component-bulletins {
display: none;
}
-span.service.expansion-button {
+div.service.expansion-button {
+ float: left;
margin-right: 4px;
margin-top: 2px;
+ width: 10px;
+ height: 10px;
}
span.referencing-component-active-thread-count {
@@ -197,6 +200,8 @@ div.referencing-component-references {
#disable-controller-service-name {
float: left;
+ max-width: 280px;
+ text-overflow: ellipsis;
}
#disable-controller-service-bulletins {
@@ -246,6 +251,8 @@ div.referencing-component-references {
#enable-controller-service-name {
float: left;
+ max-width: 280px;
+ text-overflow: ellipsis;
}
#enable-controller-service-bulletins {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js
index ca45a3d..06ef89f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js
@@ -833,6 +833,9 @@ nf.Canvas = (function () {
bulletinIcon.show();
}
}
+
+ // update controller service and reporting task bulletins
+ nf.Settings.setBulletins(controllerStatus.controllerServiceBulletins, controllerStatus.reportingTaskBulletins);
// handle any pending user request
if (controllerStatus.hasPendingAccounts === true) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
index baa9edc..efcf9fb 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
@@ -149,7 +149,10 @@ nf.ControllerService = (function () {
// get the table and update the row accordingly
var controllerServiceGrid = $('#controller-services-table').data('gridInstance');
var controllerServiceData = controllerServiceGrid.getData();
- controllerServiceData.updateItem(controllerService.id, controllerService);
+ var currentControllerService = controllerServiceData.getItemById(controllerService.id);
+ controllerServiceData.updateItem(controllerService.id, $.extend({
+ bulletins: currentControllerService.bulletins
+ }, controllerService));
};
/**
@@ -426,7 +429,7 @@ nf.ControllerService = (function () {
// container for this service's references
var referencingServiceReferencesContainer = $('<div class="referencing-component-references hidden"></div>');
- var serviceTwist = $('<span class="service expansion-button collapsed pointer"></span>').on('click', function() {
+ var serviceTwist = $('<div class="service expansion-button collapsed pointer"></div>').on('click', function() {
if (serviceTwist.hasClass('collapsed')) {
var controllerServiceGrid = $('#controller-services-table').data('gridInstance');
var controllerServiceData = controllerServiceGrid.getData();
@@ -511,7 +514,7 @@ nf.ControllerService = (function () {
return;
}
- var twist = $('<span class="expansion-button expanded"></span>');
+ var twist = $('<div class="expansion-button expanded"></div>');
var title = $('<span class="referencing-component-title"></span>').text(titleText);
var count = $('<span class="referencing-component-count"></span>').text('(' + list.children().length + ')');
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/59aa8ffe/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
index d323892..3715110 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-/* global nf, Slick */
+/* global nf, Slick, d3 */
nf.Settings = (function () {
@@ -600,9 +600,21 @@ nf.Settings = (function () {
// more details formatter
var moreControllerServiceDetails = function (row, cell, value, columnDef, dataContext) {
var markup = '<img src="images/iconDetails.png" title="View Details" class="pointer view-controller-service" style="margin-top: 5px; float: left;" />';
- if (!nf.Common.isEmpty(dataContext.validationErrors)) {
- markup += '<img src="images/iconAlert.png" class="has-errors" style="margin-top: 4px; margin-left: 4px; float: left;" /><span class="hidden row-id">' + nf.Common.escapeHtml(dataContext.id) + '</span>';
+ var hasErrors = !nf.Common.isEmpty(dataContext.validationErrors);
+ var hasBulletins = !nf.Common.isEmpty(dataContext.bulletins);
+
+ if (hasErrors) {
+ markup += '<img src="images/iconAlert.png" class="has-errors" style="margin-top: 4px; margin-left: 1px; float: left;" />';
+ }
+
+ if (hasBulletins) {
+ markup += '<img src="images/iconBulletin.png" class="has-bulletins" style="margin-top: 5px; margin-left: 5px; float: left;"/>';
+ }
+
+ if (hasErrors || hasBulletins) {
+ markup += '<span class="hidden row-id">' + nf.Common.escapeHtml(dataContext.id) + '</span>';
}
+
return markup;
};
@@ -642,7 +654,12 @@ nf.Settings = (function () {
markup += '<img src="images/iconDisable.png" title="Disable" class="pointer disable-controller-service" style="margin-top: 2px;" /> ';
} else if (dataContext.state === 'DISABLED') {
markup += '<img src="images/iconEdit.png" title="Edit" class="pointer edit-controller-service" style="margin-top: 2px;" /> ';
- markup += '<img src="images/iconEnable.png" title="Enable" class="pointer enable-controller-service" style="margin-top: 2px;"/> ';
+
+ // if there are no validation errors allow enabling
+ if (nf.Common.isEmpty(dataContext.validationErrors)) {
+ markup += '<img src="images/iconEnable.png" title="Enable" class="pointer enable-controller-service" style="margin-top: 2px;"/> ';
+ }
+
markup += '<img src="images/iconDelete.png" title="Remove" class="pointer delete-controller-service" style="margin-top: 2px;" /> ';
}
}
@@ -655,7 +672,7 @@ nf.Settings = (function () {
// define the column model for the controller services table
var controllerServicesColumns = [
- {id: 'moreDetails', name: ' ', resizable: false, formatter: moreControllerServiceDetails, sortable: false, width: 50, maxWidth: 50},
+ {id: 'moreDetails', name: ' ', resizable: false, formatter: moreControllerServiceDetails, sortable: false, width: 65, maxWidth: 65},
{id: 'name', field: 'name', name: 'Name', sortable: true, resizable: true},
{id: 'type', field: 'type', name: 'Type', formatter: typeFormatter, sortable: true, resizable: true},
{id: 'state', field: 'state', name: 'State', formatter: controllerServiceStateFormatter, sortable: true, resizeable: true}
@@ -767,6 +784,34 @@ nf.Settings = (function () {
}, nf.Common.config.tooltipConfig));
}
}
+
+ var bulletinIcon = $(this).find('img.has-bulletins');
+ if (bulletinIcon.length && !bulletinIcon.data('qtip')) {
+ var taskId = $(this).find('span.row-id').text();
+
+ // get the task item
+ var item = controllerServicesData.getItemById(taskId);
+
+ // format the tooltip
+ var bulletins = nf.Common.getFormattedBulletins(item.bulletins);
+ var tooltip = nf.Common.formatUnorderedList(bulletins);
+
+ // show the tooltip
+ if (nf.Common.isDefinedAndNotNull(tooltip)) {
+ bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
+ content: tooltip,
+ position: {
+ target: 'mouse',
+ viewport: $(window),
+ adjust: {
+ x: 8,
+ y: 8,
+ method: 'flipinvert flipinvert'
+ }
+ }
+ }));
+ }
+ }
});
};
@@ -785,7 +830,9 @@ nf.Settings = (function () {
var nodeServices = response.controllerServices;
if (nf.Common.isDefinedAndNotNull(nodeServices)) {
$.each(nodeServices, function (_, nodeService) {
- services.push(nodeService);
+ services.push($.extend({
+ bulletins: []
+ }, nodeService));
});
}
});
@@ -801,7 +848,9 @@ nf.Settings = (function () {
var ncmServices = response.controllerServices;
if (nf.Common.isDefinedAndNotNull(ncmServices)) {
$.each(ncmServices, function (_, ncmService) {
- services.push(ncmService);
+ services.push($.extend({
+ bulletins: []
+ }, ncmService));
});
}
deferred.resolve();
@@ -817,6 +866,7 @@ nf.Settings = (function () {
return $.when(nodeControllerServices, ncmControllerServices).done(function () {
var controllerServicesElement = $('#controller-services-table');
nf.Common.cleanUpTooltips(controllerServicesElement, 'img.has-errors');
+ nf.Common.cleanUpTooltips(controllerServicesElement, 'img.has-bulletins');
var controllerServicesGrid = controllerServicesElement.data('gridInstance');
var controllerServicesData = controllerServicesGrid.getData();
@@ -1198,9 +1248,21 @@ nf.Settings = (function () {
var moreReportingTaskDetails = function (row, cell, value, columnDef, dataContext) {
var markup = '<img src="images/iconDetails.png" title="View Details" class="pointer view-reporting-task" style="margin-top: 5px; float: left;" />';
- if (!nf.Common.isEmpty(dataContext.validationErrors)) {
- markup += '<img src="images/iconAlert.png" class="has-errors" style="margin-top: 4px; margin-left: 4px; float: left" /><span class="hidden row-id">' + nf.Common.escapeHtml(dataContext.id) + '</span>';
+ var hasErrors = !nf.Common.isEmpty(dataContext.validationErrors);
+ var hasBulletins = !nf.Common.isEmpty(dataContext.bulletins);
+
+ if (hasErrors) {
+ markup += '<img src="images/iconAlert.png" class="has-errors" style="margin-top: 4px; margin-left: 1px; float: left;" />';
}
+
+ if (hasBulletins) {
+ markup += '<img src="images/iconBulletin.png" class="has-bulletins" style="margin-top: 5px; margin-left: 5px; float: left;"/>';
+ }
+
+ if (hasErrors || hasBulletins) {
+ markup += '<span class="hidden row-id">' + nf.Common.escapeHtml(dataContext.id) + '</span>';
+ }
+
return markup;
};
@@ -1239,7 +1301,12 @@ nf.Settings = (function () {
markup += '<img src="images/iconStop.png" title="Stop" class="pointer stop-reporting-task" style="margin-top: 2px;" /> ';
} else if (dataContext.state === 'STOPPED' || dataContext.state === 'DISABLED') {
markup += '<img src="images/iconEdit.png" title="Edit" class="pointer edit-reporting-task" style="margin-top: 2px;" /> ';
- markup += '<img src="images/iconRun.png" title="Start" class="pointer start-reporting-task" style="margin-top: 2px;"/> ';
+
+ // support starting when stopped and no validation errors
+ if (dataContext.state === 'STOPPED' && nf.Common.isEmpty(dataContext.validationErrors)) {
+ markup += '<img src="images/iconRun.png" title="Start" class="pointer start-reporting-task" style="margin-top: 2px;"/> ';
+ }
+
markup += '<img src="images/iconDelete.png" title="Remove" class="pointer delete-reporting-task" style="margin-top: 2px;" /> ';
}
}
@@ -1252,7 +1319,7 @@ nf.Settings = (function () {
// define the column model for the reporting tasks table
var reportingTasksColumnModel = [
- {id: 'moreDetails', field: 'moreDetails', name: ' ', resizable: false, formatter: moreReportingTaskDetails, sortable: true, width: 50, maxWidth: 50},
+ {id: 'moreDetails', field: 'moreDetails', name: ' ', resizable: false, formatter: moreReportingTaskDetails, sortable: true, width: 65, maxWidth: 65},
{id: 'name', field: 'name', name: 'Name', sortable: true, resizable: true},
{id: 'type', field: 'type', name: 'Type', sortable: true, resizable: true, formatter: typeFormatter},
{id: 'state', field: 'state', name: 'Run Status', sortable: true, resizeable: true, formatter: reportingTaskRunStatusFormatter}
@@ -1342,7 +1409,7 @@ nf.Settings = (function () {
if (errorIcon.length && !errorIcon.data('qtip')) {
var taskId = $(this).find('span.row-id').text();
- // get the service item
+ // get the task item
var item = reportingTasksData.getItemById(taskId);
// format the errors
@@ -1364,6 +1431,34 @@ nf.Settings = (function () {
}, nf.Common.config.tooltipConfig));
}
}
+
+ var bulletinIcon = $(this).find('img.has-bulletins');
+ if (bulletinIcon.length && !bulletinIcon.data('qtip')) {
+ var taskId = $(this).find('span.row-id').text();
+
+ // get the task item
+ var item = reportingTasksData.getItemById(taskId);
+
+ // format the tooltip
+ var bulletins = nf.Common.getFormattedBulletins(item.bulletins);
+ var tooltip = nf.Common.formatUnorderedList(bulletins);
+
+ // show the tooltip
+ if (nf.Common.isDefinedAndNotNull(tooltip)) {
+ bulletinIcon.qtip($.extend({}, nf.Common.config.tooltipConfig, {
+ content: tooltip,
+ position: {
+ target: 'mouse',
+ viewport: $(window),
+ adjust: {
+ x: 8,
+ y: 8,
+ method: 'flipinvert flipinvert'
+ }
+ }
+ }));
+ }
+ }
});
};
@@ -1382,7 +1477,9 @@ nf.Settings = (function () {
var nodeTasks = response.reportingTasks;
if (nf.Common.isDefinedAndNotNull(nodeTasks)) {
$.each(nodeTasks, function (_, nodeTask) {
- tasks.push(nodeTask);
+ tasks.push($.extend({
+ bulletins: []
+ }, nodeTask));
});
}
});
@@ -1414,6 +1511,7 @@ nf.Settings = (function () {
return $.when(nodeReportingTasks, ncmReportingTasks).done(function () {
var reportingTasksElement = $('#reporting-tasks-table');
nf.Common.cleanUpTooltips(reportingTasksElement, 'img.has-errors');
+ nf.Common.cleanUpTooltips(reportingTasksElement, 'img.has-bulletins');
var reportingTasksGrid = reportingTasksElement.data('gridInstance');
var reportingTasksData = reportingTasksGrid.getData();
@@ -1580,7 +1678,75 @@ nf.Settings = (function () {
var reportingTasks = loadReportingTasks();
// return a deferred for all parts of the settings
- return $.when(settings, controllerServices, reportingTasks).fail(nf.Common.handleAjaxError);
+ return $.when(settings, controllerServices, reportingTasks).done(nf.Canvas.reloadStatus).fail(nf.Common.handleAjaxError);
+ },
+
+ /**
+ * Sets the controller service and reporting task bulletins in their respective tables.
+ *
+ * @param {object} controllerServiceBulletins
+ * @param {object} reportingTaskBulletins
+ */
+ setBulletins: function(controllerServiceBulletins, reportingTaskBulletins) {
+ // controller services
+ var controllerServicesGrid = $('#controller-services-table').data('gridInstance');
+ var controllerServicesData = controllerServicesGrid.getData();
+ controllerServicesData.beginUpdate();
+
+ // if there are some bulletins process them
+ if (!nf.Common.isEmpty(controllerServiceBulletins)) {
+ var controllerServiceBulletinsBySource = d3.nest()
+ .key(function(d) { return d.sourceId; })
+ .map(controllerServiceBulletins, d3.map);
+
+ controllerServiceBulletinsBySource.forEach(function(sourceId, sourceBulletins) {
+ var controllerService = controllerServicesData.getItemById(sourceId);
+ if (nf.Common.isDefinedAndNotNull(controllerService)) {
+ controllerServicesData.updateItem(sourceId, $.extend(controllerService, {
+ bulletins: sourceBulletins
+ }));
+ }
+ });
+ } else {
+ // if there are no bulletins clear all
+ var controllerServices = controllerServicesData.getItems();
+ $.each(controllerServices, function(_, controllerService) {
+ controllerServicesData.updateItem(controllerService.id, $.extend(controllerService, {
+ bulletins: []
+ }));
+ });
+ }
+ controllerServicesData.endUpdate();
+
+ // reporting tasks
+ var reportingTasksGrid = $('#reporting-tasks-table').data('gridInstance');
+ var reportingTasksData = reportingTasksGrid.getData();
+ reportingTasksData.beginUpdate();
+
+ // if there are some bulletins process them
+ if (!nf.Common.isEmpty(reportingTaskBulletins)) {
+ var reportingTaskBulletinsBySource = d3.nest()
+ .key(function(d) { return d.sourceId; })
+ .map(reportingTaskBulletins, d3.map);
+
+ reportingTaskBulletinsBySource.forEach(function(sourceId, sourceBulletins) {
+ var reportingTask = reportingTasksData.getItemById(sourceId);
+ if (nf.Common.isDefinedAndNotNull(reportingTask)) {
+ reportingTasksData.updateItem(sourceId, $.extend(reportingTask, {
+ bulletins: sourceBulletins
+ }));
+ }
+ });
+ } else {
+ // if there are no bulletins clear all
+ var reportingTasks = reportingTasksData.getItems();
+ $.each(reportingTasks, function(_, reportingTask) {
+ controllerServicesData.updateItem(reportingTask.id, $.extend(reportingTask, {
+ bulletins: []
+ }));
+ });
+ }
+ reportingTasksData.endUpdate();
}
};
}());
\ No newline at end of file
[09/10] incubator-nifi git commit: NIFI-695: - Providing immediate
feedback when canceling a request to [en|dis]able a controller service. -
Reporting to the user after a request has been canceled.
Posted by mc...@apache.org.
NIFI-695:
- Providing immediate feedback when canceling a request to [en|dis]able a controller service.
- Reporting to the user after a request has been canceled.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/566c4fe8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/566c4fe8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/566c4fe8
Branch: refs/heads/develop
Commit: 566c4fe8d78b769fc31c8d5bed90daccbc817e27
Parents: c3dff40
Author: Matt Gilman <ma...@gmail.com>
Authored: Thu Jul 2 13:45:14 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Jul 2 13:45:14 2015 -0400
----------------------------------------------------------------------
.../disable-controller-service-dialog.jsp | 3 +++
.../canvas/enable-controller-service-dialog.jsp | 3 +++
.../src/main/webapp/css/controller-service.css | 6 +++++
.../js/nf/canvas/nf-controller-service.js | 24 ++++++++++++++++++++
4 files changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/566c4fe8/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
index 1ecb730..0c77a10 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
@@ -70,4 +70,7 @@
</div>
</div>
</div>
+ <div class="controller-service-canceling hidden unset">
+ Canceling...
+ </div>
</div>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/566c4fe8/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
index 6613a04..0121e21 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
@@ -69,4 +69,7 @@
</div>
</div>
</div>
+ <div class="controller-service-canceling hidden unset">
+ Canceling...
+ </div>
</div>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/566c4fe8/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
index 9d87994..b0d980b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
@@ -161,6 +161,12 @@ div.referencing-component-references {
clear: left;
}
+div.controller-service-canceling {
+ position: absolute;
+ bottom: 10px;
+ right: 8px;
+}
+
/*
Comments
*/
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/566c4fe8/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
index 078064a..75c7bee 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
@@ -1032,6 +1032,8 @@ nf.ControllerService = (function () {
handler: {
click: function () {
canceled = true;
+ disableDialog.modal('setButtonModel', []);
+ $('#disable-controller-service-dialog div.controller-service-canceling').show();
}
}
}]);
@@ -1053,6 +1055,7 @@ nf.ControllerService = (function () {
// sets the close button on the dialog
var setCloseButton = function () {
+ $('#disable-controller-service-dialog div.controller-service-canceling').hide();
disableDialog.modal('setButtonModel', [{
buttonText: 'Close',
handler: {
@@ -1100,6 +1103,15 @@ nf.ControllerService = (function () {
}).always(function () {
reloadControllerServiceAndReferencingComponents(controllerService);
setCloseButton();
+
+ // inform the user if the action was canceled
+ if (canceled === true && $('#nf-ok-dialog').not(':visible')) {
+ nf.Dialog.showOkDialog({
+ overlayBackground: false,
+ headerText: 'Action Canceled',
+ dialogContent: 'The request to disable has been canceled. Parts of this request may have already completed. Please verify the state of this service and all referencing components.'
+ });
+ }
});
};
@@ -1116,6 +1128,8 @@ nf.ControllerService = (function () {
handler: {
click: function () {
canceled = true;
+ enableDialog.modal('setButtonModel', []);
+ $('#enable-controller-service-dialog div.controller-service-canceling').show();
}
}
}]);
@@ -1143,6 +1157,7 @@ nf.ControllerService = (function () {
// sets the button to close
var setCloseButton = function () {
+ $('#enable-controller-service-dialog div.controller-service-canceling').hide();
enableDialog.modal('setButtonModel', [{
buttonText: 'Close',
handler: {
@@ -1200,6 +1215,15 @@ nf.ControllerService = (function () {
}).always(function () {
reloadControllerServiceAndReferencingComponents(controllerService);
setCloseButton();
+
+ // inform the user if the action was canceled
+ if (canceled === true && $('#nf-ok-dialog').not(':visible')) {
+ nf.Dialog.showOkDialog({
+ overlayBackground: false,
+ headerText: 'Action Canceled',
+ dialogContent: 'The request to enable has been canceled. Parts of this request may have already completed. Please verify the state of this service and all referencing components.'
+ });
+ }
});
};
[04/10] incubator-nifi git commit: NIFI-694: - Showing bulletins for
services and referencing components when enabling and disabling.
Posted by mc...@apache.org.
NIFI-694:
- Showing bulletins for services and referencing components when enabling and disabling.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/7a289034
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/7a289034
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/7a289034
Branch: refs/heads/develop
Commit: 7a28903402192819accaded9a90adfdfad852776
Parents: 0a7ab1a
Author: Matt Gilman <ma...@gmail.com>
Authored: Thu Jun 25 14:38:47 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Jun 25 14:38:47 2015 -0400
----------------------------------------------------------------------
.../disable-controller-service-dialog.jsp | 4 +-
.../canvas/enable-controller-service-dialog.jsp | 4 +-
.../src/main/webapp/css/controller-service.css | 35 +++
.../js/nf/canvas/nf-controller-service.js | 222 ++++++++++++++++---
4 files changed, 230 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7a289034/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
index dc76282..1ecb730 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/disable-controller-service-dialog.jsp
@@ -22,7 +22,9 @@
<div class="setting-name">Service</div>
<div class="setting-field">
<span id="disable-controller-service-id" class="hidden"></span>
- <span id="disable-controller-service-name"></span>
+ <div id="disable-controller-service-name"></div>
+ <div id="disable-controller-service-bulletins"></div>
+ <div class="clear"></div>
</div>
</div>
<div id="disable-controller-service-scope-container" class="setting">
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7a289034/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
index 14fe658..6613a04 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/WEB-INF/partials/canvas/enable-controller-service-dialog.jsp
@@ -22,7 +22,9 @@
<div class="setting-name">Service</div>
<div class="setting-field">
<span id="enable-controller-service-id" class="hidden"></span>
- <span id="enable-controller-service-name"></span>
+ <div id="enable-controller-service-name"></div>
+ <div id="enable-controller-service-bulletins"></div>
+ <div class="clear"></div>
</div>
</div>
<div id="enable-controller-service-scope-container" class="setting">
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7a289034/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
index 3d722cd..4a82810 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/css/controller-service.css
@@ -124,6 +124,15 @@ div.referencing-component-state {
margin-top: -2px;
}
+div.referencing-component-bulletins {
+ float: left;
+ margin-left: 5px;
+ width: 12px;
+ height: 12px;
+ background-color: transparent;
+ display: none;
+}
+
span.service.expansion-button {
margin-right: 4px;
margin-top: 2px;
@@ -186,6 +195,19 @@ div.referencing-component-references {
margin-right: 40px;
}
+#disable-controller-service-name {
+ float: left;
+}
+
+#disable-controller-service-bulletins {
+ float: left;
+ margin-left: 5px;
+ width: 12px;
+ height: 12px;
+ background-color: transparent;
+ display: none;
+}
+
#disable-controller-service-referencing-components {
border: 0 solid #CCCCCC;
padding: 2px;
@@ -222,6 +244,19 @@ div.referencing-component-references {
margin-right: 40px;
}
+#enable-controller-service-name {
+ float: left;
+}
+
+#enable-controller-service-bulletins {
+ float: left;
+ margin-left: 5px;
+ width: 12px;
+ height: 12px;
+ background-color: transparent;
+ display: none;
+}
+
#enable-controller-service-scope {
float: left;
width: 225px;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/7a289034/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
index e8a111d..baa9edc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
@@ -241,6 +241,40 @@ nf.ControllerService = (function () {
};
/**
+ * Updates the specified bulletinIcon with the specified bulletins if necessary.
+ *
+ * @param {array} bulletins
+ * @param {jQuery} bulletinIcon
+ */
+ var updateBulletins = function (bulletins, bulletinIcon) {
+ var currentBulletins = bulletinIcon.data('bulletins');
+
+ // update the bulletins if necessary
+ if (nf.Common.doBulletinsDiffer(currentBulletins, bulletins)) {
+ bulletinIcon.data('bulletins', bulletins);
+
+ // format the new bulletins
+ var formattedBulletins = nf.Common.getFormattedBulletins(bulletins);
+
+ // if there are bulletins update them
+ if (bulletins.length > 0) {
+ var list = nf.Common.formatUnorderedList(formattedBulletins);
+
+ // update existing tooltip or initialize a new one if appropriate
+ if (bulletinIcon.data('qtip')) {
+ bulletinIcon.qtip('option', 'content.text', list);
+ } else {
+ bulletinIcon.addClass('has-bulletins').show().qtip($.extend({
+ content: list
+ }, nf.CanvasUtils.config.systemTooltipConfig));
+ }
+ } else if (bulletinIcon.data('qtip')) {
+ bulletinIcon.removeClass('has-bulletins').removeData('bulletins').hide().qtip('api').destroy(true);
+ }
+ }
+ };
+
+ /**
* Updates the referencingComponentState using the specified referencingComponent.
*
* @param {jQuery} referencingComponentState
@@ -301,6 +335,23 @@ nf.ControllerService = (function () {
};
/**
+ * Updates the bulletins for all referencing components.
+ *
+ * @param {array} bulletins
+ */
+ var updateReferencingComponentBulletins = function(bulletins) {
+ var bulletinsBySource = d3.nest()
+ .key(function(d) { return d.sourceId; })
+ .map(bulletins, d3.map);
+
+ bulletinsBySource.forEach(function(sourceId, sourceBulletins) {
+ $('div.' + sourceId + '-bulletins').each(function() {
+ updateBulletins(sourceBulletins, $(this));
+ });
+ });
+ };
+
+ /**
* Adds the specified reference for this controller service.
*
* @param {jQuery} referenceContainer
@@ -323,10 +374,13 @@ nf.ControllerService = (function () {
}
};
+ var referencingComponentIds = [];
var processors = $('<ul class="referencing-component-listing clear"></ul>');
var services = $('<ul class="referencing-component-listing clear"></ul>');
var tasks = $('<ul class="referencing-component-listing clear"></ul>');
$.each(referencingComponents, function (_, referencingComponent) {
+ referencingComponentIds.push(referencingComponent.id);
+
if (referencingComponent.referenceType === 'Processor') {
var processorLink = $('<span class="referencing-component-name link"></span>').text(referencingComponent.name).on('click', function () {
// show the component
@@ -340,6 +394,9 @@ nf.ControllerService = (function () {
// state
var processorState = $('<div class="referencing-component-state"></div>').addClass(referencingComponent.id + '-state');
updateReferencingSchedulableComponentState(processorState, referencingComponent);
+
+ // bulletin
+ var processorBulletins = $('<div class="referencing-component-bulletins"></div>').addClass(referencingComponent.id + '-bulletins');
// type
var processorType = $('<span class="referencing-component-type"></span>').text(nf.Common.substringAfterLast(referencingComponent.type, '.'));
@@ -351,7 +408,7 @@ nf.ControllerService = (function () {
}
// processor
- var processorItem = $('<li></li>').append(processorState).append(processorLink).append(processorType).append(processorActiveThreadCount);
+ var processorItem = $('<li></li>').append(processorState).append(processorBulletins).append(processorLink).append(processorType).append(processorActiveThreadCount);
processors.append(processorItem);
} else if (referencingComponent.referenceType === 'ControllerService') {
var serviceLink = $('<span class="referencing-component-name link"></span>').text(referencingComponent.name).on('click', function () {
@@ -392,11 +449,14 @@ nf.ControllerService = (function () {
var serviceState = $('<div class="referencing-component-state"></div>').addClass(referencingComponent.id + '-state');
updateReferencingServiceState(serviceState, referencingComponent);
+ // bulletin
+ var serviceBulletins = $('<div class="referencing-component-bulletins"></div>').addClass(referencingComponent.id + '-bulletins');
+
// type
var serviceType = $('<span class="referencing-component-type"></span>').text(nf.Common.substringAfterLast(referencingComponent.type, '.'));
// service
- var serviceItem = $('<li></li>').append(serviceTwist).append(serviceState).append(serviceLink).append(serviceType).append(referencingServiceReferencesContainer);
+ var serviceItem = $('<li></li>').append(serviceTwist).append(serviceState).append(serviceBulletins).append(serviceLink).append(serviceType).append(referencingServiceReferencesContainer);
services.append(serviceItem);
} else if (referencingComponent.referenceType === 'ReportingTask') {
@@ -420,6 +480,9 @@ nf.ControllerService = (function () {
var reportingTaskState = $('<div class="referencing-component-state"></div>').addClass(referencingComponent.id + '-state');
updateReferencingSchedulableComponentState(reportingTaskState, referencingComponent);
+ // bulletins
+ var reportingTaskBulletins = $('<div class="referencing-component-bulletins"></div>').addClass(referencingComponent.id + '-bulletins');
+
// type
var reportingTaskType = $('<span class="referencing-component-type"></span>').text(nf.Common.substringAfterLast(referencingComponent.type, '.'));
@@ -430,11 +493,17 @@ nf.ControllerService = (function () {
}
// reporting task
- var reportingTaskItem = $('<li></li>').append(reportingTaskState).append(reportingTaskLink).append(reportingTaskType).append(reportingTaskActiveThreadCount);
+ var reportingTaskItem = $('<li></li>').append(reportingTaskState).append(reportingTaskBulletins).append(reportingTaskLink).append(reportingTaskType).append(reportingTaskActiveThreadCount);
tasks.append(reportingTaskItem);
}
});
+ // query for the bulletins
+ queryBulletins(referencingComponentIds).done(function(response) {
+ var bulletins = response.bulletinBoard.bulletins;
+ updateReferencingComponentBulletins(bulletins);
+ });
+
// create the collapsable listing for each type
var createReferenceBlock = function (titleText, list) {
if (list.is(':empty')) {
@@ -466,6 +535,25 @@ nf.ControllerService = (function () {
};
/**
+ * Queries for bulletins for the specified components.
+ *
+ * @param {array} componentIds
+ * @returns {deferred}
+ */
+ var queryBulletins = function(componentIds) {
+ var ids = componentIds.join('|');
+
+ return $.ajax({
+ type: 'GET',
+ url: '../nifi-api/controller/bulletin-board',
+ data: {
+ sourceId: ids
+ },
+ dataType: 'json'
+ }).fail(nf.Common.handleAjaxError);
+ };
+
+ /**
* Sets whether the specified controller service is enabled.
*
* @param {object} controllerService
@@ -491,13 +579,23 @@ nf.ControllerService = (function () {
// wait unil the polling of each service finished
return $.Deferred(function(deferred) {
updated.done(function() {
- var serviceUpdated = pollService(controllerService, function (service) {
+ var serviceUpdated = pollService(controllerService, function (service, bulletins) {
+ if ($.isArray(bulletins)) {
+ if (enabled) {
+ updateBulletins(bulletins, $('#enable-controller-service-bulletins'));
+ } else {
+ updateBulletins(bulletins, $('#disable-controller-service-bulletins'));
+ }
+ }
+
// the condition is met once the service is ENABLED/DISABLED
if (enabled) {
return service.state === 'ENABLED';
} else {
return service.state === 'DISABLED';
}
+ }, function(service) {
+ return queryBulletins([service.id]);
}, pollCondition);
// once the service has updated, resolve and render the updated service
@@ -608,9 +706,10 @@ nf.ControllerService = (function () {
*
* @param {object} controllerService
* @param {function} completeCondition
+ * @param {function} bulletinDeferred
* @param {function} pollCondition
*/
- var pollService = function (controllerService, completeCondition, pollCondition) {
+ var pollService = function (controllerService, completeCondition, bulletinDeferred, pollCondition) {
// we want to keep polling until the condition is met
return $.Deferred(function(deferred) {
var current = 2;
@@ -618,19 +717,24 @@ nf.ControllerService = (function () {
var val = current;
// update the current timeout for the next time
- current = Math.max(current * 2, 8);
+ current = Math.min(current * 2, 4);
return val * 1000;
};
// polls for the current status of the referencing components
var poll = function() {
- $.ajax({
+ var bulletins = bulletinDeferred(controllerService);
+ var service = $.ajax({
type: 'GET',
url: controllerService.uri,
dataType: 'json'
- }).done(function (response) {
- conditionMet(response.controllerService);
+ });
+
+ $.when(bulletins, service).done(function (bulletinResult, serviceResult) {
+ var bulletinResponse = bulletinResult[0];
+ var serviceResponse = serviceResult[0];
+ conditionMet(serviceResponse.controllerService, bulletinResponse.bulletinBoard.bulletins);
}).fail(function (xhr, status, error) {
deferred.reject();
nf.Common.handleAjaxError(xhr, status, error);
@@ -638,8 +742,8 @@ nf.ControllerService = (function () {
};
// tests to if the condition has been met
- var conditionMet = function (service) {
- if (completeCondition(service)) {
+ var conditionMet = function (service, bulletins) {
+ if (completeCondition(service, bulletins)) {
deferred.resolve();
} else {
if (typeof pollCondition === 'function' && pollCondition()) {
@@ -651,7 +755,12 @@ nf.ControllerService = (function () {
};
// poll for the status of the referencing components
- conditionMet(controllerService);
+ bulletinDeferred(controllerService).done(function(response) {
+ conditionMet(controllerService, response.bulletinBoard.bulletins);
+ }).fail(function (xhr, status, error) {
+ deferred.reject();
+ nf.Common.handleAjaxError(xhr, status, error);
+ });
}).promise();
};
@@ -664,7 +773,7 @@ nf.ControllerService = (function () {
*/
var stopReferencingSchedulableComponents = function (controllerService, pollCondition) {
// continue to poll the service until all schedulable components have stopped
- return pollService(controllerService, function (service) {
+ return pollService(controllerService, function (service, bulletins) {
var referencingComponents = service.referencingComponents;
var stillRunning = false;
@@ -682,9 +791,23 @@ nf.ControllerService = (function () {
updateReferencingSchedulableComponentState(referencingComponentState, referencingComponent);
}
});
+
+ // query for the bulletins
+ updateReferencingComponentBulletins(bulletins);
// condition is met once all referencing are not running
return stillRunning === false;
+ }, function(service) {
+ var referencingSchedulableComponents = [];
+
+ var referencingComponents = service.referencingComponents;
+ $.each(referencingComponents, function(_, referencingComponent) {
+ if (referencingComponent.referenceType === 'Processor' || referencingComponent.referenceType === 'ReportingTask') {
+ referencingSchedulableComponents.push(referencingComponent.id);
+ }
+ });
+
+ return queryBulletins(referencingSchedulableComponents);
}, pollCondition);
};
@@ -696,7 +819,7 @@ nf.ControllerService = (function () {
*/
var enableReferencingServices = function (controllerService, pollCondition) {
// continue to poll the service until all referencing services are enabled
- return pollService(controllerService, function (service) {
+ return pollService(controllerService, function (service, bulletins) {
var referencingComponents = service.referencingComponents;
var notEnabled = false;
@@ -711,9 +834,23 @@ nf.ControllerService = (function () {
updateReferencingServiceState(referencingServiceState, referencingComponent);
}
});
+
+ // query for the bulletins
+ updateReferencingComponentBulletins(bulletins);
// condition is met once all referencing are not disabled
return notEnabled === false;
+ }, function(service) {
+ var referencingSchedulableComponents = [];
+
+ var referencingComponents = service.referencingComponents;
+ $.each(referencingComponents, function(_, referencingComponent) {
+ if (referencingComponent.referenceType === 'ControllerService') {
+ referencingSchedulableComponents.push(referencingComponent.id);
+ }
+ });
+
+ return queryBulletins(referencingSchedulableComponents);
}, pollCondition);
};
@@ -725,7 +862,7 @@ nf.ControllerService = (function () {
*/
var disableReferencingServices = function (controllerService, pollCondition) {
// continue to poll the service until all referencing services are disabled
- return pollService(controllerService, function (service) {
+ return pollService(controllerService, function (service, bulletins) {
var referencingComponents = service.referencingComponents;
var notDisabled = false;
@@ -740,9 +877,23 @@ nf.ControllerService = (function () {
updateReferencingServiceState(referencingServiceState, referencingComponent);
}
});
+
+ // query for the bulletins
+ updateReferencingComponentBulletins(bulletins);
// condition is met once all referencing are not enabled
return notDisabled === false;
+ }, function(service) {
+ var referencingSchedulableComponents = [];
+
+ var referencingComponents = service.referencingComponents;
+ $.each(referencingComponents, function(_, referencingComponent) {
+ if (referencingComponent.referenceType === 'ControllerService') {
+ referencingSchedulableComponents.push(referencingComponent.id);
+ }
+ });
+
+ return queryBulletins(referencingSchedulableComponents);
}, pollCondition);
};
@@ -823,6 +974,11 @@ nf.ControllerService = (function () {
// show the dialog
$('#disable-controller-service-dialog').modal('show');
+ // load the bulletins
+ queryBulletins([controllerService.id]).done(function(response) {
+ updateBulletins(response.bulletinBoard.bulletins, $('#disable-controller-service-bulletins'));
+ });
+
// update the border if necessary
updateReferencingComponentsBorder(referencingComponentsContainer);
};
@@ -844,6 +1000,11 @@ nf.ControllerService = (function () {
// show the dialog
$('#enable-controller-service-dialog').modal('show');
+ // load the bulletins
+ queryBulletins([controllerService.id]).done(function(response) {
+ updateBulletins(response.bulletinBoard.bulletins, $('#enable-controller-service-bulletins'));
+ });
+
// update the border if necessary
updateReferencingComponentsBorder(referencingComponentsContainer);
};
@@ -873,7 +1034,6 @@ nf.ControllerService = (function () {
}]);
// show the progress
- $('#disable-controller-service-service-container').hide();
$('#disable-controller-service-scope-container').hide();
$('#disable-controller-service-progress-container').show();
@@ -964,7 +1124,6 @@ nf.ControllerService = (function () {
}
// show the progress
- $('#enable-controller-service-service-container').hide();
$('#enable-controller-service-scope-container').hide();
$('#enable-controller-service-progress-container').show();
@@ -1223,6 +1382,7 @@ nf.ControllerService = (function () {
// empty the referencing components list
var referencingComponents = $('#controller-service-referencing-components');
nf.Common.cleanUpTooltips(referencingComponents, 'div.referencing-component-state');
+ nf.Common.cleanUpTooltips(referencingComponents, 'div.referencing-component-bulletins');
referencingComponents.css('border-width', '0').empty();
// cancel any active edits
@@ -1271,7 +1431,6 @@ nf.ControllerService = (function () {
var disableDialog = $(this);
// reset visibility
- $('#disable-controller-service-service-container').show();
$('#disable-controller-service-scope-container').show();
$('#disable-controller-service-progress-container').hide();
@@ -1279,12 +1438,17 @@ nf.ControllerService = (function () {
$('#disable-controller-service-id').text('');
$('#disable-controller-service-name').text('');
+ // bulletins
+ $('#disable-controller-service-bulletins').removeClass('has-bulletins').removeData('bulletins').hide();
+ nf.Common.cleanUpTooltips($('#disable-controller-service-service-container'), '#disable-controller-service-bulletins');
+
// reset progress
$('div.disable-referencing-components').removeClass('ajax-loading ajax-complete ajax-error');
// referencing components
var referencingComponents = $('#disable-controller-service-referencing-components');
nf.Common.cleanUpTooltips(referencingComponents, 'div.referencing-component-state');
+ nf.Common.cleanUpTooltips(referencingComponents, 'div.referencing-component-bulletins');
referencingComponents.css('border-width', '0').empty();
// reset dialog
@@ -1339,7 +1503,6 @@ nf.ControllerService = (function () {
var enableDialog = $(this);
// reset visibility
- $('#enable-controller-service-service-container').show();
$('#enable-controller-service-scope-container').show();
$('#enable-controller-service-progress-container').hide();
$('#enable-controller-service-progress li.referencing-component').show();
@@ -1348,12 +1511,17 @@ nf.ControllerService = (function () {
$('#enable-controller-service-id').text('');
$('#enable-controller-service-name').text('');
+ // bulletins
+ $('#enable-controller-service-bulletins').removeClass('has-bulletins').removeData('bulletins').hide();
+ nf.Common.cleanUpTooltips($('#enable-controller-service-service-container'), '#enable-controller-service-bulletins');
+
// reset progress
$('div.enable-referencing-components').removeClass('ajax-loading ajax-complete ajax-error');
// referencing components
var referencingComponents = $('#enable-controller-service-referencing-components');
nf.Common.cleanUpTooltips(referencingComponents, 'div.referencing-component-state');
+ nf.Common.cleanUpTooltips(referencingComponents, 'div.referencing-component-bulletins');
referencingComponents.css('border-width', '0').empty();
// reset dialog
@@ -1654,13 +1822,7 @@ nf.ControllerService = (function () {
* @param {object} controllerService
*/
enable: function(controllerService) {
- if (nf.Common.isEmpty(controllerService.referencingComponents)) {
- setEnabled(controllerService, true).always(function () {
- reloadControllerServiceAndReferencingComponents(controllerService);
- });
- } else {
- showEnableControllerServiceDialog(controllerService);
- }
+ showEnableControllerServiceDialog(controllerService);
},
/**
@@ -1669,13 +1831,7 @@ nf.ControllerService = (function () {
* @param {object} controllerService
*/
disable: function(controllerService) {
- if (nf.Common.isEmpty(controllerService.referencingComponents)) {
- setEnabled(controllerService, false).always(function () {
- reloadControllerServiceAndReferencingComponents(controllerService);
- });
- } else {
- showDisableControllerServiceDialog(controllerService);
- }
+ showDisableControllerServiceDialog(controllerService);
},
/**
[02/10] incubator-nifi git commit: NIFI-724: Enable bulletins for
reporting tasks and controller services
Posted by mc...@apache.org.
NIFI-724: Enable bulletins for reporting tasks and controller services
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/e240e07a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/e240e07a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/e240e07a
Branch: refs/heads/develop
Commit: e240e07aaebea1fd66b22fce8aec3f0005fd3f60
Parents: e767f5c
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Jun 24 14:03:34 2015 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Jun 25 10:56:49 2015 -0400
----------------------------------------------------------------------
.../org/apache/nifi/reporting/Bulletin.java | 11 ++-
.../apache/nifi/reporting/ComponentType.java | 58 ++++++++++++++
.../manager/impl/ClusteredReportingContext.java | 46 ++++++++++-
.../cluster/manager/impl/WebClusterManager.java | 80 ++++++++++++--------
.../org/apache/nifi/events/BulletinFactory.java | 30 ++++++--
.../org/apache/nifi/events/SystemBulletin.java | 2 +
.../nifi/logging/LogRepositoryFactory.java | 6 +-
.../apache/nifi/controller/FlowController.java | 35 ++++++---
.../service/ControllerServiceLoader.java | 11 +--
.../nifi/events/VolatileBulletinRepository.java | 18 +++--
.../org/apache/nifi/jaxb/AdaptedBulletin.java | 10 +++
.../org/apache/nifi/jaxb/BulletinAdapter.java | 3 +-
.../logging/ControllerServiceLogObserver.java | 45 +++++++++++
.../nifi/logging/ReportingTaskLogObserver.java | 45 +++++++++++
.../nifi/remote/StandardRemoteProcessGroup.java | 16 ++--
.../nifi/remote/StandardRootGroupPort.java | 4 +-
16 files changed, 349 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
index 87443a3..fe370ae 100644
--- a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/Bulletin.java
@@ -34,6 +34,7 @@ public abstract class Bulletin implements Comparable<Bulletin> {
private String groupId;
private String sourceId;
private String sourceName;
+ private ComponentType sourceType;
protected Bulletin(final long id) {
this.timestamp = new Date();
@@ -104,9 +105,17 @@ public abstract class Bulletin implements Comparable<Bulletin> {
this.sourceName = sourceName;
}
+ public ComponentType getSourceType() {
+ return sourceType;
+ }
+
+ public void setSourceType(ComponentType sourceType) {
+ this.sourceType = sourceType;
+ }
+
@Override
public String toString() {
- return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + '}';
+ return "Bulletin{" + "id=" + id + ", message=" + message + ", sourceName=" + sourceName + ", sourceType=" + sourceType + '}';
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
new file mode 100644
index 0000000..97f3538
--- /dev/null
+++ b/nifi/nifi-api/src/main/java/org/apache/nifi/reporting/ComponentType.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting;
+
+/**
+ * An Enumeration for indicating which type of component a Bulletin is associated with
+ */
+public enum ComponentType {
+
+ /**
+ * Bulletin is associated with a Processor
+ */
+ PROCESSOR,
+
+ /**
+ * Bulletin is associated with a Remote Process Group
+ */
+ REMOTE_PROCESS_GROUP,
+
+ /**
+ * Bulletin is associated with an Input Port
+ */
+ INPUT_PORT,
+
+ /**
+ * Bulletin is associated with an Output Port
+ */
+ OUTPUT_PORT,
+
+ /**
+ * Bulletin is associated with a Reporting Task
+ */
+ REPORTING_TASK,
+
+ /**
+ * Bulletin is associated with a Controller Service
+ */
+ CONTROLLER_SERVICE,
+
+ /**
+ * Bulletin is a system-level bulletin, associated with the Flow Controller
+ */
+ FLOW_CONTROLLER;
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
index e546f87..c6624cc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/ClusteredReportingContext.java
@@ -24,15 +24,18 @@ import org.apache.nifi.attribute.expression.language.PreparedQuery;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.controller.ControllerService;
import org.apache.nifi.controller.ControllerServiceLookup;
import org.apache.nifi.controller.service.ControllerServiceProvider;
import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.processor.StandardPropertyValue;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.Severity;
@@ -85,8 +88,9 @@ public class ClusteredReportingContext implements ReportingContext {
final ProcessGroupStatus rootGroupStatus = eventAccess.getControllerStatus();
final String groupId = findGroupId(rootGroupStatus, componentId);
final String componentName = findComponentName(rootGroupStatus, componentId);
+ final ComponentType componentType = findComponentType(rootGroupStatus, componentId);
- return BulletinFactory.createBulletin(groupId, componentId, componentName, category, severity.name(), message);
+ return BulletinFactory.createBulletin(groupId, componentId, componentType, componentName, category, severity.name(), message);
}
@Override
@@ -134,6 +138,46 @@ public class ClusteredReportingContext implements ReportingContext {
return null;
}
+ private ComponentType findComponentType(final ProcessGroupStatus groupStatus, final String componentId) {
+ for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) {
+ if (procStatus.getId().equals(componentId)) {
+ return ComponentType.PROCESSOR;
+ }
+ }
+
+ for (final PortStatus portStatus : groupStatus.getInputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return ComponentType.INPUT_PORT;
+ }
+ }
+
+ for (final PortStatus portStatus : groupStatus.getOutputPortStatus()) {
+ if (portStatus.getId().equals(componentId)) {
+ return ComponentType.OUTPUT_PORT;
+ }
+ }
+
+ for (final RemoteProcessGroupStatus remoteStatus : groupStatus.getRemoteProcessGroupStatus()) {
+ if (remoteStatus.getId().equals(componentId)) {
+ return ComponentType.REMOTE_PROCESS_GROUP;
+ }
+ }
+
+ for (final ProcessGroupStatus childGroup : groupStatus.getProcessGroupStatus()) {
+ final ComponentType type = findComponentType(childGroup, componentId);
+ if (type != null) {
+ return type;
+ }
+ }
+
+ final ControllerService service = serviceProvider.getControllerService(componentId);
+ if (service != null) {
+ return ComponentType.CONTROLLER_SERVICE;
+ }
+
+ return null;
+ }
+
private String findComponentName(final ProcessGroupStatus groupStatus, final String componentId) {
for (final ProcessorStatus procStatus : groupStatus.getProcessorStatus()) {
if (procStatus.getId().equals(componentId)) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
index d6ba6db..9edc83f 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster/src/main/java/org/apache/nifi/cluster/manager/impl/WebClusterManager.java
@@ -129,6 +129,7 @@ import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.ScheduledState;
import org.apache.nifi.controller.StandardFlowSerializer;
+import org.apache.nifi.controller.StandardProcessorNode;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
import org.apache.nifi.controller.reporting.ClusteredReportingTaskNode;
@@ -159,7 +160,12 @@ import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.framework.security.util.SslContextFactory;
import org.apache.nifi.io.socket.multicast.DiscoverableService;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.ControllerServiceLogObserver;
+import org.apache.nifi.logging.LogLevel;
+import org.apache.nifi.logging.LogRepository;
+import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.NiFiLog;
+import org.apache.nifi.logging.ReportingTaskLogObserver;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
@@ -929,7 +935,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
//optional properties for all ReportingTasks
for (final Element taskElement : DomUtils.getChildElementsByTagName(tasksElement, "reportingTask")) {
//add global properties common to all tasks
- Map<String, String> properties = new HashMap<>();
+ final Map<String, String> properties = new HashMap<>();
//get properties for the specific reporting task - id, name, class,
//and schedulingPeriod must be set
@@ -1080,6 +1086,11 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
}
+ // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+ final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+ logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+ new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
+
return taskNode;
}
@@ -1368,7 +1379,14 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
- return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+ final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+
+ // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+ final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+ logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+ new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
+
+ return serviceNode;
}
@Override
@@ -1630,7 +1648,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final String nodeAddress = nodeIdentifier.getApiAddress() + ":" + nodeIdentifier.getApiPort();
// unmarshal the message
- BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload());
+ final BulletinsPayload payload = BulletinsPayload.unmarshal(bulletins.getPayload());
for (final Bulletin bulletin : payload.getBulletins()) {
bulletin.setNodeAddress(nodeAddress);
bulletinRepository.addBulletin(bulletin);
@@ -1688,7 +1706,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final int numPendingHeartbeats = mostRecentHeartbeats.size();
if (heartbeatLogger.isDebugEnabled()) {
- heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, (numPendingHeartbeats > 1) ? "s" : ""));
+ heartbeatLogger.debug(String.format("Handling %s heartbeat%s", numPendingHeartbeats, numPendingHeartbeats > 1 ? "s" : ""));
}
for (final Heartbeat mostRecentHeartbeat : mostRecentHeartbeats) {
@@ -2130,7 +2148,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
readLock.lock();
try {
final ClusterServicesBroadcaster broadcaster = this.servicesBroadcaster;
- return (broadcaster != null && broadcaster.isRunning());
+ return broadcaster != null && broadcaster.isRunning();
} finally {
readLock.unlock("isBroadcasting");
}
@@ -2323,7 +2341,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
if (auditService != null) {
try {
auditService.addActions(clusterContext.getActions());
- } catch (Throwable t) {
+ } catch (final Throwable t) {
logger.warn("Unable to record actions: " + t.getMessage());
if (logger.isDebugEnabled()) {
logger.warn(StringUtils.EMPTY, t);
@@ -2834,7 +2852,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ProcessorEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorEntity.class);
+ final ProcessorEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorEntity.class);
final ProcessorDTO nodeProcessor = nodeResponseEntity.getProcessor();
processorMap.put(nodeResponse.getNodeId(), nodeProcessor);
}
@@ -2851,7 +2869,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ProcessorsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
+ final ProcessorsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessorsEntity.class);
final Set<ProcessorDTO> nodeProcessors = nodeResponseEntity.getProcessors();
for (final ProcessorDTO nodeProcessor : nodeProcessors) {
@@ -2892,7 +2910,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ProcessGroupEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
+ final ProcessGroupEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProcessGroupEntity.class);
final ProcessGroupDTO nodeProcessGroup = nodeResponseEntity.getProcessGroup();
for (final ProcessorDTO nodeProcessor : nodeProcessGroup.getContents().getProcessors()) {
@@ -2952,7 +2970,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final FlowSnippetEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
+ final FlowSnippetEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(FlowSnippetEntity.class);
final FlowSnippetDTO nodeContents = nodeResponseEntity.getContents();
for (final ProcessorDTO nodeProcessor : nodeContents.getProcessors()) {
@@ -2995,7 +3013,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
// create a new client response
clientResponse = new NodeResponse(clientResponse, responseEntity);
- } else if (hasSuccessfulClientResponse && (isRemoteProcessGroupEndpoint(uri, method))) {
+ } else if (hasSuccessfulClientResponse && isRemoteProcessGroupEndpoint(uri, method)) {
final RemoteProcessGroupEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
final RemoteProcessGroupDTO remoteProcessGroup = responseEntity.getRemoteProcessGroup();
@@ -3005,7 +3023,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final RemoteProcessGroupEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
+ final RemoteProcessGroupEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupEntity.class);
final RemoteProcessGroupDTO nodeRemoteProcessGroup = nodeResponseEntity.getRemoteProcessGroup();
remoteProcessGroupMap.put(nodeResponse.getNodeId(), nodeRemoteProcessGroup);
@@ -3013,7 +3031,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
mergeRemoteProcessGroup(remoteProcessGroup, remoteProcessGroupMap);
clientResponse = new NodeResponse(clientResponse, responseEntity);
- } else if (hasSuccessfulClientResponse && (isRemoteProcessGroupsEndpoint(uri, method))) {
+ } else if (hasSuccessfulClientResponse && isRemoteProcessGroupsEndpoint(uri, method)) {
final RemoteProcessGroupsEntity responseEntity = clientResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
final Set<RemoteProcessGroupDTO> remoteProcessGroups = responseEntity.getRemoteProcessGroups();
@@ -3023,7 +3041,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final RemoteProcessGroupsEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
+ final RemoteProcessGroupsEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(RemoteProcessGroupsEntity.class);
final Set<RemoteProcessGroupDTO> nodeRemoteProcessGroups = nodeResponseEntity.getRemoteProcessGroups();
for (final RemoteProcessGroupDTO nodeRemoteProcessGroup : nodeRemoteProcessGroups) {
@@ -3056,7 +3074,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ProvenanceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class);
+ final ProvenanceEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ProvenanceEntity.class);
final ProvenanceDTO nodeQuery = nodeResponseEntity.getProvenance();
resultsMap.put(nodeResponse.getNodeId(), nodeQuery);
@@ -3084,7 +3102,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ControllerServiceEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
+ final ControllerServiceEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceEntity.class);
final ControllerServiceDTO nodeControllerService = nodeResponseEntity.getControllerService();
resultsMap.put(nodeResponse.getNodeId(), nodeControllerService);
@@ -3102,7 +3120,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ControllerServicesEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
+ final ControllerServicesEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServicesEntity.class);
final Set<ControllerServiceDTO> nodeControllerServices = nodeResponseEntity.getControllerServices();
for (final ControllerServiceDTO nodeControllerService : nodeControllerServices) {
@@ -3136,7 +3154,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
final ControllerServiceReferencingComponentsEntity nodeResponseEntity =
- (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
+ nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ControllerServiceReferencingComponentsEntity.class);
final Set<ControllerServiceReferencingComponentDTO> nodeReferencingComponents = nodeResponseEntity.getControllerServiceReferencingComponents();
resultsMap.put(nodeResponse.getNodeId(), nodeReferencingComponents);
@@ -3154,7 +3172,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ReportingTaskEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
+ final ReportingTaskEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTaskEntity.class);
final ReportingTaskDTO nodeReportingTask = nodeResponseEntity.getReportingTask();
resultsMap.put(nodeResponse.getNodeId(), nodeReportingTask);
@@ -3172,7 +3190,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
continue;
}
- final ReportingTasksEntity nodeResponseEntity = (nodeResponse == clientResponse) ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
+ final ReportingTasksEntity nodeResponseEntity = nodeResponse == clientResponse ? responseEntity : nodeResponse.getClientResponse().getEntity(ReportingTasksEntity.class);
final Set<ReportingTaskDTO> nodeReportingTasks = nodeResponseEntity.getReportingTasks();
for (final ReportingTaskDTO nodeReportingTask : nodeReportingTasks) {
@@ -3428,7 +3446,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
}
private boolean canChangeNodeState(final String method, final URI uri) {
- return (HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method));
+ return HttpMethod.DELETE.equalsIgnoreCase(method) || HttpMethod.POST.equalsIgnoreCase(method) || HttpMethod.PUT.equalsIgnoreCase(method);
}
private void notifyDataFlowManagementServiceOfNodeStatusChange() {
@@ -3477,7 +3495,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public void run() {
logger.info("Entering safe mode...");
final int safeModeSeconds = (int) FormatUtils.getTimeDuration(properties.getClusterManagerSafeModeDuration(), TimeUnit.SECONDS);
- final long timeToElect = (safeModeSeconds <= 0) ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
+ final long timeToElect = safeModeSeconds <= 0 ? Long.MAX_VALUE : threadStartTime + TimeUnit.MILLISECONDS.convert(safeModeSeconds, TimeUnit.SECONDS);
boolean exitSafeMode = false;
while (isRunning()) {
@@ -3819,7 +3837,7 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
public static Date normalizeStatusSnapshotDate(final Date toNormalize, final long numMillis) {
final long time = toNormalize.getTime();
- return new Date(time - (time % numMillis));
+ return new Date(time - time % numMillis);
}
private NodeDTO createNodeDTO(final Node node) {
@@ -3861,8 +3879,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
StatusHistoryDTO lastStatusHistory = null;
- Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>();
- Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+ final Set<MetricDescriptor<?>> processorDescriptors = new LinkedHashSet<>();
+ final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
for (final Node node : getRawNodes()) {
final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
@@ -3942,8 +3960,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
StatusHistoryDTO lastStatusHistory = null;
- Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>();
- Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+ final Set<MetricDescriptor<?>> connectionDescriptors = new LinkedHashSet<>();
+ final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
for (final Node node : getRawNodes()) {
final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
@@ -4006,8 +4024,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
StatusHistoryDTO lastStatusHistory = null;
- Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>();
- Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+ final Set<MetricDescriptor<?>> processGroupDescriptors = new LinkedHashSet<>();
+ final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
for (final Node node : getRawNodes()) {
final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
@@ -4070,8 +4088,8 @@ public class WebClusterManager implements HttpClusterManager, ProtocolHandler, C
final List<NodeStatusHistoryDTO> nodeHistories = new ArrayList<>();
StatusHistoryDTO lastStatusHistory = null;
- Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>();
- Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
+ final Set<MetricDescriptor<?>> remoteProcessGroupDescriptors = new LinkedHashSet<>();
+ final Map<Date, List<StatusSnapshot>> snapshotsToAggregate = new TreeMap<>();
for (final Node node : getRawNodes()) {
final ComponentStatusRepository statusRepository = componentMetricsRepositoryMap.get(node.getNodeId());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
index d1d5e5b..4795827 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/BulletinFactory.java
@@ -17,24 +17,43 @@
package org.apache.nifi.events;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.ComponentType;
-/**
- *
- */
public final class BulletinFactory {
private static final AtomicLong currentId = new AtomicLong(0);
public static Bulletin createBulletin(final Connectable connectable, final String category, final String severity, final String message) {
- return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), connectable.getName(), category, severity, message);
+ final ComponentType type;
+ switch (connectable.getConnectableType()) {
+ case REMOTE_INPUT_PORT:
+ case REMOTE_OUTPUT_PORT:
+ type = ComponentType.REMOTE_PROCESS_GROUP;
+ break;
+ case INPUT_PORT:
+ type = ComponentType.INPUT_PORT;
+ break;
+ case OUTPUT_PORT:
+ type = ComponentType.OUTPUT_PORT;
+ break;
+ case PROCESSOR:
+ default:
+ type = ComponentType.PROCESSOR;
+ break;
+ }
+
+ return BulletinFactory.createBulletin(connectable.getProcessGroup().getIdentifier(), connectable.getIdentifier(), type, connectable.getName(), category, severity, message);
}
- public static Bulletin createBulletin(final String groupId, final String sourceId, final String sourceName, final String category, final String severity, final String message) {
+ public static Bulletin createBulletin(final String groupId, final String sourceId, final ComponentType sourceType, final String sourceName,
+ final String category, final String severity, final String message) {
final Bulletin bulletin = new ComponentBulletin(currentId.getAndIncrement());
bulletin.setGroupId(groupId);
bulletin.setSourceId(sourceId);
+ bulletin.setSourceType(sourceType);
bulletin.setSourceName(sourceName);
bulletin.setCategory(category);
bulletin.setLevel(severity);
@@ -47,6 +66,7 @@ public final class BulletinFactory {
bulletin.setCategory(category);
bulletin.setLevel(severity);
bulletin.setMessage(message);
+ bulletin.setSourceType(ComponentType.FLOW_CONTROLLER);
return bulletin;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
index f97dc46..3359e7e 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/events/SystemBulletin.java
@@ -17,6 +17,7 @@
package org.apache.nifi.events;
import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.ComponentType;
/**
*
@@ -25,6 +26,7 @@ public class SystemBulletin extends Bulletin {
SystemBulletin(final long id) {
super(id);
+ setSourceType(ComponentType.FLOW_CONTROLLER);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
index 76ca661..d7fa3fc 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/logging/LogRepositoryFactory.java
@@ -41,8 +41,8 @@ public class LogRepositoryFactory {
logRepositoryClass = clazz;
}
- public static LogRepository getRepository(final String processorId) {
- LogRepository repository = repositoryMap.get(requireNonNull(processorId));
+ public static LogRepository getRepository(final String componentId) {
+ LogRepository repository = repositoryMap.get(requireNonNull(componentId));
if (repository == null) {
try {
repository = logRepositoryClass.newInstance();
@@ -50,7 +50,7 @@ public class LogRepositoryFactory {
throw new RuntimeException(e);
}
- final LogRepository oldRepository = repositoryMap.putIfAbsent(processorId, repository);
+ final LogRepository oldRepository = repositoryMap.putIfAbsent(componentId, repository);
if (oldRepository != null) {
repository = oldRepository;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 2ffdd4e..b6edbbb 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -137,11 +137,13 @@ import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.groups.StandardProcessGroup;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.ControllerServiceLogObserver;
import org.apache.nifi.logging.LogLevel;
import org.apache.nifi.logging.LogRepository;
import org.apache.nifi.logging.LogRepositoryFactory;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.logging.ProcessorLogObserver;
+import org.apache.nifi.logging.ReportingTaskLogObserver;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.nar.NarThreadContextClassLoader;
@@ -593,7 +595,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
writeLock.lock();
try {
if (startDelayedComponents) {
- LOG.info("Starting {} processors/ports/funnels", (startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size()));
+ LOG.info("Starting {} processors/ports/funnels", startConnectablesAfterInitialization.size() + startRemoteGroupPortsAfterInitialization.size());
for (final Connectable connectable : startConnectablesAfterInitialization) {
if (connectable.getScheduledState() == ScheduledState.DISABLED) {
continue;
@@ -1012,7 +1014,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public boolean isTerminated() {
this.readLock.lock();
try {
- return (null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated());
+ return null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated();
} finally {
this.readLock.unlock();
}
@@ -1828,9 +1830,9 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
} else if (id1.equals(id2)) {
return true;
} else {
- final String comparable1 = (id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1);
- final String comparable2 = (id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2);
- return (comparable1.equals(comparable2));
+ final String comparable1 = id1.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id1;
+ final String comparable2 = id2.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id2;
+ return comparable1.equals(comparable2);
}
}
@@ -1964,7 +1966,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
final String searchId = id.equals(ROOT_GROUP_ID_ALIAS) ? getRootGroupId() : id;
- return (root == null) ? null : root.findProcessGroup(searchId);
+ return root == null ? null : root.findProcessGroup(searchId);
}
@Override
@@ -2079,8 +2081,8 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
connStatus.setOutputBytes(connectionStatusReport.getContentSizeOut());
connStatus.setOutputCount(connectionStatusReport.getFlowFilesOut());
- flowFilesTransferred += (connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut());
- bytesTransferred += (connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut());
+ flowFilesTransferred += connectionStatusReport.getFlowFilesIn() + connectionStatusReport.getFlowFilesOut();
+ bytesTransferred += connectionStatusReport.getContentSizeIn() + connectionStatusReport.getContentSizeOut();
}
if (StringUtils.isNotBlank(conn.getName())) {
@@ -2552,6 +2554,12 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
reportingTasks.put(id, taskNode);
+
+ // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+ final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+ logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+ new ReportingTaskLogObserver(getBulletinRepository(), taskNode));
+
return taskNode;
}
@@ -2616,7 +2624,14 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
@Override
public ControllerServiceNode createControllerService(final String type, final String id, final boolean firstTimeAdded) {
- return controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+ final ControllerServiceNode serviceNode = controllerServiceProvider.createControllerService(type, id, firstTimeAdded);
+
+ // Register log observer to provide bulletins when reporting task logs anything at WARN level or above
+ final LogRepository logRepository = LogRepositoryFactory.getRepository(id);
+ logRepository.addObserver(StandardProcessorNode.BULLETIN_OBSERVER_ID, LogLevel.WARN,
+ new ControllerServiceLogObserver(getBulletinRepository(), serviceNode));
+
+ return serviceNode;
}
@Override
@@ -3480,7 +3495,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (bulletin.getGroupId() == null) {
escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
} else {
- escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(),
+ escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(),
bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
}
} else {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
index 92fa3b2..b5c3855 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ControllerServiceLoader.java
@@ -48,11 +48,12 @@ public class ControllerServiceLoader {
private static final Logger logger = LoggerFactory.getLogger(ControllerServiceLoader.class);
public static List<ControllerServiceNode> loadControllerServices(
- final ControllerServiceProvider provider,
- final InputStream serializedStream,
- final StringEncryptor encryptor,
- final BulletinRepository bulletinRepo,
- final boolean autoResumeState) throws IOException {
+ final ControllerServiceProvider provider,
+ final InputStream serializedStream,
+ final StringEncryptor encryptor,
+ final BulletinRepository bulletinRepo,
+ final boolean autoResumeState) throws IOException {
+
final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
documentBuilderFactory.setNamespaceAware(true);
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
index a20e974..5172d34 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/events/VolatileBulletinRepository.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.reporting.Bulletin;
import org.apache.nifi.reporting.BulletinQuery;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.util.RingBuffer;
import org.apache.nifi.util.RingBuffer.Filter;
@@ -167,7 +168,7 @@ public class VolatileBulletinRepository implements BulletinRepository {
}
final RingBuffer<Bulletin> buffer = componentMap.get(CONTROLLER_BULLETIN_STORE_KEY);
- return (buffer == null) ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() {
+ return buffer == null ? Collections.<Bulletin>emptyList() : buffer.getSelectedElements(new Filter<Bulletin>() {
@Override
public boolean select(final Bulletin bulletin) {
return bulletin.getTimestamp().getTime() >= fiveMinutesAgo;
@@ -194,12 +195,12 @@ public class VolatileBulletinRepository implements BulletinRepository {
}
private RingBuffer<Bulletin> getBulletinBuffer(final Bulletin bulletin) {
- final String groupId = getBulletinStoreKey(bulletin);
+ final String storageKey = getBulletinStoreKey(bulletin);
- ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(groupId);
+ ConcurrentMap<String, RingBuffer<Bulletin>> componentMap = bulletinStoreMap.get(storageKey);
if (componentMap == null) {
componentMap = new ConcurrentHashMap<>();
- ConcurrentMap<String, RingBuffer<Bulletin>> existing = bulletinStoreMap.putIfAbsent(groupId, componentMap);
+ final ConcurrentMap<String, RingBuffer<Bulletin>> existing = bulletinStoreMap.putIfAbsent(storageKey, componentMap);
if (existing != null) {
componentMap = existing;
}
@@ -221,11 +222,16 @@ public class VolatileBulletinRepository implements BulletinRepository {
}
private String getBulletinStoreKey(final Bulletin bulletin) {
- return isControllerBulletin(bulletin) ? CONTROLLER_BULLETIN_STORE_KEY : bulletin.getGroupId();
+ if (isControllerBulletin(bulletin)) {
+ return CONTROLLER_BULLETIN_STORE_KEY;
+ }
+
+ final String groupId = bulletin.getGroupId();
+ return groupId == null ? bulletin.getSourceId() : groupId;
}
private boolean isControllerBulletin(final Bulletin bulletin) {
- return bulletin.getGroupId() == null;
+ return ComponentType.FLOW_CONTROLLER.equals(bulletin.getSourceType());
}
private class DefaultBulletinProcessingStrategy implements BulletinProcessingStrategy {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
index 6f1dc45..17c5991 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/AdaptedBulletin.java
@@ -18,6 +18,8 @@ package org.apache.nifi.jaxb;
import java.util.Date;
+import org.apache.nifi.reporting.ComponentType;
+
/**
*
*/
@@ -32,6 +34,7 @@ public class AdaptedBulletin {
private String groupId;
private String sourceId;
private String sourceName;
+ private ComponentType sourceType;
public String getCategory() {
return category;
@@ -97,4 +100,11 @@ public class AdaptedBulletin {
this.timestamp = timestamp;
}
+ public ComponentType getSourceType() {
+ return sourceType;
+ }
+
+ public void setSourceType(ComponentType sourceType) {
+ this.sourceType = sourceType;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
index b699348..acbe0dd 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/jaxb/BulletinAdapter.java
@@ -34,7 +34,7 @@ public class BulletinAdapter extends XmlAdapter<AdaptedBulletin, Bulletin> {
if (b.getSourceId() == null) {
return BulletinFactory.createBulletin(b.getCategory(), b.getLevel(), b.getMessage());
} else {
- return BulletinFactory.createBulletin(b.getGroupId(), b.getSourceId(), b.getSourceName(), b.getCategory(), b.getLevel(), b.getMessage());
+ return BulletinFactory.createBulletin(b.getGroupId(), b.getSourceId(), b.getSourceType(), b.getSourceName(), b.getCategory(), b.getLevel(), b.getMessage());
}
}
@@ -48,6 +48,7 @@ public class BulletinAdapter extends XmlAdapter<AdaptedBulletin, Bulletin> {
aBulletin.setTimestamp(b.getTimestamp());
aBulletin.setGroupId(b.getGroupId());
aBulletin.setSourceId(b.getSourceId());
+ aBulletin.setSourceType(b.getSourceType());
aBulletin.setSourceName(b.getSourceName());
aBulletin.setCategory(b.getCategory());
aBulletin.setLevel(b.getLevel());
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
new file mode 100644
index 0000000..837e1c4
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ControllerServiceLogObserver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import org.apache.nifi.controller.service.ControllerServiceNode;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.reporting.Severity;
+
+public class ControllerServiceLogObserver implements LogObserver {
+ private final BulletinRepository bulletinRepository;
+ private final ControllerServiceNode serviceNode;
+
+ public ControllerServiceLogObserver(final BulletinRepository bulletinRepository, final ControllerServiceNode serviceNode) {
+ this.bulletinRepository = bulletinRepository;
+ this.serviceNode = serviceNode;
+ }
+
+ @Override
+ public void onLogMessage(final LogMessage message) {
+ // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
+ // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
+ final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString();
+
+ final Bulletin bulletin = BulletinFactory.createBulletin(null, serviceNode.getIdentifier(), ComponentType.REPORTING_TASK,
+ serviceNode.getName(), "Log Message", bulletinLevel, message.getMessage());
+ bulletinRepository.addBulletin(bulletin);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
new file mode 100644
index 0000000..e5638d6
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/logging/ReportingTaskLogObserver.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.logging;
+
+import org.apache.nifi.controller.ReportingTaskNode;
+import org.apache.nifi.events.BulletinFactory;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
+import org.apache.nifi.reporting.Severity;
+
+public class ReportingTaskLogObserver implements LogObserver {
+ private final BulletinRepository bulletinRepository;
+ private final ReportingTaskNode taskNode;
+
+ public ReportingTaskLogObserver(final BulletinRepository bulletinRepository, final ReportingTaskNode taskNode) {
+ this.bulletinRepository = bulletinRepository;
+ this.taskNode = taskNode;
+ }
+
+ @Override
+ public void onLogMessage(final LogMessage message) {
+ // Map LogLevel.WARN to Severity.WARNING so that we are consistent with the Severity enumeration. Else, just use whatever
+ // the LogLevel is (INFO and ERROR map directly and all others we will just accept as they are).
+ final String bulletinLevel = message.getLevel() == LogLevel.WARN ? Severity.WARNING.name() : message.getLevel().toString();
+
+ final Bulletin bulletin = BulletinFactory.createBulletin(null, taskNode.getIdentifier(), ComponentType.REPORTING_TASK,
+ taskNode.getName(), "Log Message", bulletinLevel, message.getMessage());
+ bulletinRepository.addBulletin(bulletin);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
index d19b5c1..61516d0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/remote/StandardRemoteProcessGroup.java
@@ -57,6 +57,7 @@ import org.apache.nifi.groups.ProcessGroupCounts;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroupPortDescriptor;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
@@ -164,7 +165,8 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
final String groupId = StandardRemoteProcessGroup.this.getProcessGroup().getIdentifier();
final String sourceId = StandardRemoteProcessGroup.this.getIdentifier();
final String sourceName = StandardRemoteProcessGroup.this.getName();
- bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message));
+ bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, ComponentType.REMOTE_PROCESS_GROUP,
+ sourceName, category, severity.name(), message));
}
};
@@ -227,7 +229,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
@Override
public String getName() {
final String name = this.name.get();
- return (name == null) ? targetUri.toString() : name;
+ return name == null ? targetUri.toString() : name;
}
@Override
@@ -671,7 +673,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
private ProcessGroup getRootGroup(final ProcessGroup context) {
final ProcessGroup parent = context.getParent();
- return (parent == null) ? context : getRootGroup(parent);
+ return parent == null ? context : getRootGroup(parent);
}
private boolean isWebApiSecure() {
@@ -714,7 +716,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
public Date getLastRefreshTime() {
readLock.lock();
try {
- return (refreshContentsTimestamp == null) ? null : new Date(refreshContentsTimestamp);
+ return refreshContentsTimestamp == null ? null : new Date(refreshContentsTimestamp);
} finally {
readLock.unlock();
}
@@ -855,7 +857,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
Set<RemoteProcessGroupPortDescriptor> remotePorts = null;
if (ports != null) {
remotePorts = new LinkedHashSet<>(ports.size());
- for (PortDTO port : ports) {
+ for (final PortDTO port : ports) {
final StandardRemoteProcessGroupPortDescriptor descriptor = new StandardRemoteProcessGroupPortDescriptor();
final ScheduledState scheduledState = ScheduledState.valueOf(port.getState());
descriptor.setId(port.getId());
@@ -1093,7 +1095,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
}
final String remoteInstanceId = dto.getInstanceId();
- boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId);
+ final boolean isPointingToCluster = flowController.getInstanceId().equals(remoteInstanceId);
pointsToCluster.set(isPointingToCluster);
} else if (statusCode == UNAUTHORIZED_STATUS_CODE) {
try {
@@ -1120,7 +1122,7 @@ public class StandardRemoteProcessGroup implements RemoteProcessGroup {
new Object[]{this, response.getStatus(), response.getStatusInfo().getReasonPhrase(), message});
authorizationIssue = "Unable to determine Site-to-Site availability.";
}
- } catch (Exception e) {
+ } catch (final Exception e) {
logger.warn(String.format("Unable to connect to %s due to %s", StandardRemoteProcessGroup.this, e));
getEventReporter().reportEvent(Severity.WARNING, "Site to Site", String.format("Unable to connect to %s due to %s",
StandardRemoteProcessGroup.this.getTargetUri().toString(), e));
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/e240e07a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
index 4bb1683..9eadec0 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/StandardRootGroupPort.java
@@ -61,6 +61,7 @@ import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.ServerProtocol;
import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.ComponentType;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.user.NiFiUser;
@@ -108,7 +109,8 @@ public class StandardRootGroupPort extends AbstractPort implements RootGroupPort
final String groupId = StandardRootGroupPort.this.getProcessGroup().getIdentifier();
final String sourceId = StandardRootGroupPort.this.getIdentifier();
final String sourceName = StandardRootGroupPort.this.getName();
- bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, sourceName, category, severity.name(), message));
+ final ComponentType componentType = direction == TransferDirection.RECEIVE ? ComponentType.INPUT_PORT : ComponentType.OUTPUT_PORT;
+ bulletinRepository.addBulletin(BulletinFactory.createBulletin(groupId, sourceId, componentType, sourceName, category, severity.name(), message));
}
};
[07/10] incubator-nifi git commit: NIFI-694: - Fixing bug when
clearing bulletins from the reporting task table. - Allowed ENABLING state to
trigger transition to next step of the enable/disable request.
Posted by mc...@apache.org.
NIFI-694:
- Fixing bug when clearing bulletins from the reporting task table.
- Allowed ENABLING state to trigger transition to next step of the enable/disable request.
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4fd1e949
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4fd1e949
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4fd1e949
Branch: refs/heads/develop
Commit: 4fd1e9494e41bea198dabedacee16ed018a3a7b7
Parents: e7c0461
Author: Matt Gilman <ma...@gmail.com>
Authored: Wed Jul 1 14:25:12 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Wed Jul 1 14:25:12 2015 -0400
----------------------------------------------------------------------
.../src/main/webapp/js/nf/canvas/nf-controller-service.js | 6 +++---
.../nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js | 2 +-
2 files changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4fd1e949/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
index efcf9fb..078064a 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
@@ -591,9 +591,9 @@ nf.ControllerService = (function () {
}
}
- // the condition is met once the service is ENABLED/DISABLED
+ // the condition is met once the service is (ENABLING or ENABLED)/DISABLED
if (enabled) {
- return service.state === 'ENABLED';
+ return service.state === 'ENABLING' || service.state === 'ENABLED';
} else {
return service.state === 'DISABLED';
}
@@ -828,7 +828,7 @@ nf.ControllerService = (function () {
var notEnabled = false;
$.each(referencingComponents, function(_, referencingComponent) {
if (referencingComponent.referenceType === 'ControllerService') {
- if (referencingComponent.state !== 'ENABLED') {
+ if (referencingComponent.state !== 'ENABLING' && referencingComponent.state !== 'ENABLED') {
notEnabled = true;
}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4fd1e949/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
index 3715110..476f34b 100644
--- a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
+++ b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-settings.js
@@ -1741,7 +1741,7 @@ nf.Settings = (function () {
// if there are no bulletins clear all
var reportingTasks = reportingTasksData.getItems();
$.each(reportingTasks, function(_, reportingTask) {
- controllerServicesData.updateItem(reportingTask.id, $.extend(reportingTask, {
+ reportingTasksData.updateItem(reportingTask.id, $.extend(reportingTask, {
bulletins: []
}));
});
[10/10] incubator-nifi git commit: Merge branch 'develop' into
NIFI-724
Posted by mc...@apache.org.
Merge branch 'develop' into NIFI-724
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/eddc071b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/eddc071b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/eddc071b
Branch: refs/heads/develop
Commit: eddc071b8efc3c6ca86e373b56f214514b2abd0a
Parents: 566c4fe a091807
Author: Matt Gilman <ma...@gmail.com>
Authored: Thu Jul 2 16:27:05 2015 -0400
Committer: Matt Gilman <ma...@gmail.com>
Committed: Thu Jul 2 16:27:05 2015 -0400
----------------------------------------------------------------------
.../nifi/controller/ConfigurationContext.java | 23 ++-
.../src/main/asciidoc/developer-guide.adoc | 2 +-
.../nifi-nifi-example-nar/pom.xml | 35 ++++
.../nifi-nifi-example-processors/pom.xml | 70 +++++++
.../nifi/processors/WriteResourceToStream.java | 102 ++++++++++
.../org.apache.nifi.processor.Processor | 15 ++
.../src/main/resources/file.txt | 1 +
.../processors/WriteResourceToStreamTest.java | 47 +++++
nifi/nifi-external/nifi-example-bundle/pom.xml | 42 +++++
nifi/nifi-external/pom.xml | 2 +
.../nifi/util/MockConfigurationContext.java | 29 ++-
.../nifi/util/StandardProcessorTestRunner.java | 4 +-
.../nifi-framework/nifi-documentation/pom.xml | 5 +
.../apache/nifi/documentation/DocGenerator.java | 5 +-
.../html/HtmlDocumentationWriter.java | 75 ++++++--
.../init/ControllerServiceInitializer.java | 20 +-
.../init/ProcessorInitializer.java | 17 +-
.../init/ReportingTaskingInitializer.java | 14 +-
.../mock/MockConfigurationContext.java | 48 +++++
.../documentation/mock/MockProcessContext.java | 85 +++++++++
.../documentation/util/ReflectionUtils.java | 139 ++++++++++++++
.../nifi/documentation/DocGeneratorTest.java | 96 ++++++++++
.../FullyDocumentedControllerService.java | 63 +++++--
.../example/FullyDocumentedProcessor.java | 49 ++++-
.../example/FullyDocumentedReportingTask.java | 46 ++++-
.../html/HtmlDocumentationWriterTest.java | 33 +++-
.../html/ProcessorDocumentationWriterTest.java | 19 +-
.../src/test/resources/conf/nifi.properties | 129 +++++++++++++
.../src/test/resources/lib/example.nar | Bin 0 -> 721040 bytes
.../src/test/resources/lib/jetty.nar | Bin 0 -> 4638519 bytes
.../test/resources/lib/nifi-framework-nar.nar | Bin 0 -> 406 bytes
.../java/org/apache/nifi/nar/NarCloseable.java | 44 -----
.../nifi/nar/NarThreadContextClassLoader.java | 187 -------------------
.../apache/nifi/controller/FlowController.java | 2 +-
.../reporting/AbstractReportingTaskNode.java | 4 +-
.../repository/FileSystemRepository.java | 20 +-
.../repository/VolatileContentRepository.java | 8 +-
.../scheduling/StandardProcessScheduler.java | 24 +--
.../service/StandardConfigurationContext.java | 26 ++-
.../service/StandardControllerServiceNode.java | 2 +-
.../StandardControllerServiceProvider.java | 10 +-
...nifi.controller.repository.ContentRepository | 16 ++
...ifi.controller.repository.FlowFileRepository | 16 ++
...fi.controller.repository.FlowFileSwapManager | 15 ++
...ler.status.history.ComponentStatusRepository | 15 ++
.../java/org/apache/nifi/nar/NarCloseable.java | 44 +++++
.../nifi/nar/NarThreadContextClassLoader.java | 187 +++++++++++++++++++
.../nifi-web-ui/src/main/webapp/css/main.css | 1 +
.../src/main/webapp/js/nf/canvas/nf-canvas.js | 2 +-
.../js/nf/canvas/nf-controller-service.js | 4 +-
.../src/main/webapp/js/nf/nf-common.js | 17 ++
.../nifi/processors/kite/ConvertCSVToAvro.java | 76 ++++++--
.../nifi/processors/kite/ConvertJSONToAvro.java | 95 ++++++----
.../nifi/processors/kite/FailureTracker.java | 83 ++++++++
.../processors/kite/TestCSVToAvroProcessor.java | 96 +++++++++-
.../kite/TestJSONToAvroProcessor.java | 101 +++++++++-
.../nifi/processors/standard/ExtractText.java | 4 +-
.../apache/nifi/controller/MonitorMemory.java | 14 +-
58 files changed, 1923 insertions(+), 405 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eddc071b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eddc071b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-canvas.js
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/eddc071b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-ui/src/main/webapp/js/nf/canvas/nf-controller-service.js
----------------------------------------------------------------------