You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/04 18:28:46 UTC
[10/18] nifi git commit: NIFI-1563: - Federate requests and merge
responses from nodes instead of storing bulletins and stats at NCM - Updating
UI to support restructured status history DTO. - Return 'Insufficient
History' message if aggregate stats don'
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
deleted file mode 100644
index 1195bc9..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-public interface Heartbeater {
-
- void heartbeat();
-}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index e91ba9a..0d7f3ff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.Heartbeater;
public interface ControllerServiceNode extends ConfiguredComponent {
@@ -64,10 +63,8 @@ public interface ControllerServiceNode extends ConfiguredComponent {
* initiate service enabling task as well as its re-tries
* @param administrativeYieldMillis
* the amount of milliseconds to wait for administrative yield
- * @param heartbeater
- * the instance of {@link Heartbeater}
*/
- void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater);
+ void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis);
/**
* Will disable this service. Disabling of the service typically means
@@ -76,10 +73,8 @@ public interface ControllerServiceNode extends ConfiguredComponent {
* @param scheduler
* implementation of {@link ScheduledExecutorService} used to
* initiate service disabling task
- * @param heartbeater
- * the instance of {@link Heartbeater}
*/
- void disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
+ void disable(ScheduledExecutorService scheduler);
/**
* @return the ControllerServiceReference that describes which components are referencing this Controller Service
@@ -139,12 +134,12 @@ public interface ControllerServiceNode extends ConfiguredComponent {
/**
* Returns 'true' if this service is active. The service is considered to be
* active if and only if it's
- * {@link #enable(ScheduledExecutorService, long, Heartbeater)} operation
+ * {@link #enable(ScheduledExecutorService, long)} operation
* has been invoked and the service has been transitioned to ENABLING state.
* The service will also remain 'active' after its been transitioned to
* ENABLED state. <br>
* The service will be de-activated upon invocation of
- * {@link #disable(ScheduledExecutorService, Heartbeater)}.
+ * {@link #disable(ScheduledExecutorService)}.
*/
boolean isActive();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
index ff3ad4e..1146a39 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
@@ -20,18 +20,14 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.OutputStream;
-import java.util.List;
+
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Marshaller;
import javax.xml.bind.Unmarshaller;
import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.controller.Counter;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
-import org.apache.nifi.jaxb.CounterAdapter;
/**
* The payload of the heartbeat. The payload contains status to inform the cluster manager the current workload of this node.
@@ -50,23 +46,11 @@ public class HeartbeatPayload {
}
}
- private List<Counter> counters;
- private ProcessGroupStatus processGroupStatus;
private int activeThreadCount;
private long totalFlowFileCount;
private long totalFlowFileBytes;
- private SystemDiagnostics systemDiagnostics;
private long systemStartTime;
- @XmlJavaTypeAdapter(CounterAdapter.class)
- public List<Counter> getCounters() {
- return counters;
- }
-
- public void setCounters(final List<Counter> counters) {
- this.counters = counters;
- }
-
public int getActiveThreadCount() {
return activeThreadCount;
}
@@ -91,22 +75,6 @@ public class HeartbeatPayload {
this.totalFlowFileBytes = totalFlowFileBytes;
}
- public ProcessGroupStatus getProcessGroupStatus() {
- return processGroupStatus;
- }
-
- public void setProcessGroupStatus(final ProcessGroupStatus processGroupStatus) {
- this.processGroupStatus = processGroupStatus;
- }
-
- public SystemDiagnostics getSystemDiagnostics() {
- return systemDiagnostics;
- }
-
- public void setSystemDiagnostics(final SystemDiagnostics systemDiagnostics) {
- this.systemDiagnostics = systemDiagnostics;
- }
-
public long getSystemStartTime() {
return systemStartTime;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index 1ef18c0..d43a3db 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.StandardFlowFileQueue;
import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -71,7 +70,7 @@ public final class StandardConnection implements Connection {
relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
scheduler = builder.scheduler;
flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager,
- scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold(), builder.heartbeater);
+ scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
}
@@ -270,7 +269,6 @@ public final class StandardConnection implements Connection {
private FlowFileRepository flowFileRepository;
private ProvenanceEventRepository provenanceRepository;
private ResourceClaimManager resourceClaimManager;
- private Heartbeater heartbeater;
public Builder(final ProcessScheduler scheduler) {
this.scheduler = scheduler;
@@ -306,11 +304,6 @@ public final class StandardConnection implements Connection {
return this;
}
- public Builder heartbeater(final Heartbeater heartbeater) {
- this.heartbeater = heartbeater;
- return this;
- }
-
public Builder bendPoints(final List<Position> bendPoints) {
this.bendPoints.clear();
this.bendPoints.addAll(bendPoints);
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 09c4da6..632fa1a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,7 +16,40 @@
*/
package org.apache.nifi.controller;
-import com.sun.jersey.api.client.ClientHandlerException;
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.net.ssl.SSLContext;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.action.Action;
import org.apache.nifi.admin.service.AuditService;
@@ -27,16 +60,13 @@ import org.apache.nifi.annotation.lifecycle.OnRemoved;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
-import org.apache.nifi.cluster.BulletinsPayload;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeBulletins;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.NodeProtocolSender;
import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.connectable.Connectable;
@@ -109,7 +139,6 @@ import org.apache.nifi.encrypt.StringEncryptor;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.events.NodeBulletinProcessingStrategy;
import org.apache.nifi.events.VolatileBulletinRepository;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -184,41 +213,9 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.net.ssl.SSLContext;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.LockSupport;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.Objects.requireNonNull;
+import com.sun.jersey.api.client.ClientHandlerException;
-public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, Heartbeater, QueueProvider {
+public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider {
// default repository implementations
public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
@@ -308,7 +305,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
/**
* timer to periodically send heartbeats to the cluster
*/
- private ScheduledFuture<?> bulletinFuture;
private ScheduledFuture<?> heartbeatGeneratorFuture;
private ScheduledFuture<?> heartbeatSenderFuture;
@@ -318,8 +314,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
*/
private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null);
- private final AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber;
-
// guarded by rwLock
/**
* the node identifier;
@@ -420,7 +414,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository());
bulletinRepository = new VolatileBulletinRepository();
- nodeBulletinSubscriber = new AtomicReference<>();
try {
this.provenanceEventRepository = createProvenanceRepository(properties);
@@ -437,7 +430,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
throw new RuntimeException(e);
}
- processScheduler = new StandardProcessScheduler(this, this, encryptor, stateManagerProvider);
+ processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider);
eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider);
@@ -833,7 +826,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
.resourceClaimManager(resourceClaimManager)
.flowFileRepository(flowFileRepository)
.provenanceRepository(provenanceEventRepository)
- .heartbeater(this)
.build();
}
@@ -2122,7 +2114,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
final ProcessGroupStatus status = new ProcessGroupStatus();
status.setId(group.getIdentifier());
status.setName(group.getName());
- status.setCreationTimestamp(new Date().getTime());
int activeGroupThreads = 0;
long bytesRead = 0L;
long bytesWritten = 0L;
@@ -2899,7 +2890,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
public Counter resetCounter(final String identifier) {
final CounterRepository counterRepo = counterRepositoryRef.get();
final Counter resetValue = counterRepo.resetCounter(identifier);
- heartbeat();
return resetValue;
}
@@ -2955,8 +2945,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
stopHeartbeating();
- bulletinFuture = clusterTaskExecutor.scheduleWithFixedDelay(new BulletinsTask(protocolSender), 250, 2000, TimeUnit.MILLISECONDS);
-
final HeartbeatMessageGeneratorTask heartbeatMessageGeneratorTask = new HeartbeatMessageGeneratorTask();
heartbeatMessageGeneratorTaskRef.set(heartbeatMessageGeneratorTask);
heartbeatGeneratorFuture = clusterTaskExecutor.scheduleWithFixedDelay(heartbeatMessageGeneratorTask, 0, heartbeatDelaySeconds, TimeUnit.SECONDS);
@@ -3007,10 +2995,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
if (heartbeatSenderFuture != null) {
heartbeatSenderFuture.cancel(false);
}
-
- if (bulletinFuture != null) {
- bulletinFuture.cancel(false);
- }
} finally {
writeLock.unlock();
}
@@ -3135,8 +3119,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
// update the bulletin repository
if (isChanging) {
if (clustered) {
- nodeBulletinSubscriber.set(new NodeBulletinProcessingStrategy());
- bulletinRepository.overrideDefaultBulletinProcessing(nodeBulletinSubscriber.get());
stateManagerProvider.enableClusterProvider();
if (zooKeeperStateServer != null) {
@@ -3175,7 +3157,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
}
} else {
- bulletinRepository.restoreDefaultBulletinProcessing();
if (zooKeeperStateServer != null) {
zooKeeperStateServer.shutdown();
}
@@ -3487,6 +3468,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return replayFlowFile(record, requestor);
}
+ @SuppressWarnings("deprecation")
public ProvenanceEventRecord replayFlowFile(final ProvenanceEventRecord event, final String requestor) throws IOException {
if (event == null) {
throw new NullPointerException();
@@ -3627,7 +3609,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
- @Override
public void heartbeat() {
if (!isClustered()) {
return;
@@ -3642,110 +3623,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
}
}
- private class BulletinsTask implements Runnable {
-
- private final NodeProtocolSender protocolSender;
- private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US);
-
- public BulletinsTask(final NodeProtocolSender protocolSender) {
- if (protocolSender == null) {
- throw new IllegalArgumentException("NodeProtocolSender may not be null.");
- }
- this.protocolSender = protocolSender;
- }
-
- @Override
- public void run() {
- try {
- final NodeBulletinsMessage message = createBulletinsMessage();
- if (message == null) {
- return;
- }
-
- protocolSender.sendBulletins(message);
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- String.format(
- "Sending bulletins to cluster manager at %s",
- dateFormatter.format(new Date())));
- }
-
- } catch (final UnknownServiceAddressException usae) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(usae.getMessage());
- }
- } catch (final Exception ex) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to send bulletins to cluster manager due to: " + ex, ex);
- }
- }
- }
-
- private boolean isIllegalXmlChar(final char c) {
- return c < 0x20 && c != 0x09 && c != 0x0A && c != 0x0D;
- }
-
- private boolean containsIllegalXmlChars(final Bulletin bulletin) {
- final String message = bulletin.getMessage();
- for (int i = 0; i < message.length(); i++) {
- final char c = message.charAt(i);
- if (isIllegalXmlChar(c)) {
- return true;
- }
- }
-
- return false;
- }
-
- private String stripIllegalXmlChars(final String value) {
- final StringBuilder sb = new StringBuilder(value.length());
- for (int i = 0; i < value.length(); i++) {
- final char c = value.charAt(i);
- sb.append(isIllegalXmlChar(c) ? '?' : c);
- }
-
- return sb.toString();
- }
-
- private NodeBulletinsMessage createBulletinsMessage() {
- final Set<Bulletin> nodeBulletins = nodeBulletinSubscriber.get().getBulletins();
- final Set<Bulletin> escapedNodeBulletins = new HashSet<>(nodeBulletins.size());
-
- // ensure there are some bulletins to report
- if (nodeBulletins.isEmpty()) {
- return null;
- }
-
- for (final Bulletin bulletin : nodeBulletins) {
- final Bulletin escapedBulletin;
- if (containsIllegalXmlChars(bulletin)) {
- final String escapedBulletinMessage = stripIllegalXmlChars(bulletin.getMessage());
-
- if (bulletin.getGroupId() == null) {
- escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
- } else {
- escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(),
- bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
- }
- } else {
- escapedBulletin = bulletin;
- }
-
- escapedNodeBulletins.add(escapedBulletin);
- }
-
- // create the bulletin payload
- final BulletinsPayload payload = new BulletinsPayload();
- payload.setBulletins(escapedNodeBulletins);
-
- // create bulletin message
- final NodeBulletins bulletins = new NodeBulletins(getNodeId(), payload.marshal());
- final NodeBulletinsMessage message = new NodeBulletinsMessage();
- message.setBulletins(bulletins);
-
- return message;
- }
- }
private class HeartbeatSendTask implements Runnable {
@@ -3822,20 +3699,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
return null;
}
- final ProcessGroupStatus procGroupStatus = getGroupStatus(bean.getRootGroup(), getProcessorStats());
// create heartbeat payload
final HeartbeatPayload hbPayload = new HeartbeatPayload();
hbPayload.setSystemStartTime(systemStartTime);
- hbPayload.setActiveThreadCount(procGroupStatus.getActiveThreadCount());
+ hbPayload.setActiveThreadCount(getActiveThreadCount());
final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
- hbPayload.setCounters(getCounters());
- hbPayload.setSystemDiagnostics(getSystemDiagnostics());
- hbPayload.setProcessGroupStatus(procGroupStatus);
-
// create heartbeat message
final Heartbeat heartbeat = new Heartbeat(getNodeId(), bean.isPrimary(), bean.isConnected(), hbPayload.marshal());
final HeartbeatMessage message = new HeartbeatMessage();
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 0f3ffe0..6735959 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -111,7 +111,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private final FlowFileRepository flowFileRepository;
private final ProvenanceEventRepository provRepository;
private final ResourceClaimManager resourceClaimManager;
- private final Heartbeater heartbeater;
private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>();
@@ -120,8 +119,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
private final ProcessScheduler scheduler;
public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
- final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold,
- final Heartbeater heartbeater) {
+ final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
priorities = new ArrayList<>();
swapQueue = new ArrayList<>();
@@ -135,7 +133,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
this.swapThreshold = swapThreshold;
this.scheduler = scheduler;
this.connection = connection;
- this.heartbeater = heartbeater;
readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100);
writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100);
@@ -1182,9 +1179,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}",
dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), StandardFlowFileQueue.this.getIdentifier(), requestor);
dropRequest.setState(DropFlowFileState.COMPLETE);
- if (heartbeater != null) {
- heartbeater.heartbeat();
- }
} catch (final Exception e) {
logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), e.toString());
logger.error("", e);
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index f7e968e..3e4d3a3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -37,7 +37,6 @@ import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
@@ -70,7 +69,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
private static final Logger LOG = LoggerFactory.getLogger(StandardProcessScheduler.class);
private final ControllerServiceProvider controllerServiceProvider;
- private final Heartbeater heartbeater;
private final long administrativeYieldMillis;
private final String administrativeYieldDuration;
private final StateManagerProvider stateManagerProvider;
@@ -85,9 +83,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
private final StringEncryptor encryptor;
- public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor,
+ public StandardProcessScheduler(final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor,
final StateManagerProvider stateManagerProvider) {
- this.heartbeater = heartbeater;
this.controllerServiceProvider = controllerServiceProvider;
this.encryptor = encryptor;
this.stateManagerProvider = stateManagerProvider;
@@ -303,7 +300,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@Override
public void trigger() {
getSchedulingAgent(procNode).schedule(procNode, scheduleState);
- heartbeater.heartbeat();
}
@Override
@@ -441,7 +437,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier()));
try (final NarCloseable x = NarCloseable.withNarLoader()) {
ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext);
- heartbeater.heartbeat();
}
}
}
@@ -541,12 +536,12 @@ public final class StandardProcessScheduler implements ProcessScheduler {
@Override
public void enableControllerService(final ControllerServiceNode service) {
- service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, this.heartbeater);
+ service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis);
}
@Override
public void disableControllerService(final ControllerServiceNode service) {
- service.disable(this.componentLifeCycleThreadPool, this.heartbeater);
+ service.disable(this.componentLifeCycleThreadPool);
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 3f24ff1..bed6a35 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -36,7 +36,6 @@ import org.apache.nifi.controller.AbstractConfiguredComponent;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ConfiguredComponent;
import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.Heartbeater;
import org.apache.nifi.controller.ValidationContextFactory;
import org.apache.nifi.controller.annotation.OnConfigured;
import org.apache.nifi.controller.exception.ComponentLifeCycleException;
@@ -273,8 +272,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
* as it reached ENABLED state.
*/
@Override
- public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis,
- final Heartbeater heartbeater) {
+ public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) {
if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) {
this.active.set(true);
final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
@@ -287,13 +285,11 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
synchronized (active) {
shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED);
}
- if (shouldEnable) {
- heartbeater.heartbeat();
- } else {
+ if (!shouldEnable) {
LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated.");
// Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be
// set to DISABLING (see disable() operation)
- invokeDisable(configContext, heartbeater);
+ invokeDisable(configContext);
stateRef.set(ControllerServiceState.DISABLED);
}
} catch (Exception e) {
@@ -301,7 +297,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
- invokeDisable(configContext, heartbeater);
+ invokeDisable(configContext);
if (isActive()) {
scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
@@ -323,14 +319,14 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
* If such transition doesn't succeed (the service is still in ENABLING state)
* then the service will still be transitioned to DISABLING state to ensure that
* no other transition could happen on this service. However in such event
- * (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long, Heartbeater)}
- * operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long, Heartbeater)}
+ * (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long)}
+ * operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long)}
* <br>
* Upon successful invocation of @OnDisabled this service will be transitioned to
* DISABLED state.
*/
@Override
- public void disable(ScheduledExecutorService scheduler, final Heartbeater heartbeater) {
+ public void disable(ScheduledExecutorService scheduler) {
/*
* The reason for synchronization is to ensure consistency of the
* service state when another thread is in the middle of enabling this
@@ -347,10 +343,9 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
@Override
public void run() {
try {
- invokeDisable(configContext, heartbeater);
+ invokeDisable(configContext);
} finally {
stateRef.set(ControllerServiceState.DISABLED);
- heartbeater.heartbeat();
}
}
});
@@ -362,7 +357,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
/**
*
*/
- private void invokeDisable(ConfigurationContext configContext, Heartbeater heartbeater) {
+ private void invokeDisable(ConfigurationContext configContext) {
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
} catch (Exception e) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
new file mode 100644
index 0000000..8b9f383
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status.history;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
+
+public enum ConnectionStatusDescriptor {
+ INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+ "inputBytes",
+ "Bytes In (5 mins)",
+ "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes",
+ Formatter.DATA_SIZE,
+ new ValueMapper<ConnectionStatus>() {
+ @Override
+ public Long getValue(final ConnectionStatus status) {
+ return status.getInputBytes();
+ }
+ })),
+
+ INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+ "inputCount",
+ "FlowFiles In (5 mins)",
+ "The number of FlowFiles that were transferred to this Connection in the past 5 minutes",
+ Formatter.COUNT,
+ new ValueMapper<ConnectionStatus>() {
+ @Override
+ public Long getValue(final ConnectionStatus status) {
+ return Long.valueOf(status.getInputCount());
+ }
+ })),
+
+ OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+ "outputBytes",
+ "Bytes Out (5 mins)",
+ "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes",
+ Formatter.DATA_SIZE,
+ new ValueMapper<ConnectionStatus>() {
+ @Override
+ public Long getValue(final ConnectionStatus status) {
+ return status.getOutputBytes();
+ }
+ })),
+
+ OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+ "outputCount",
+ "FlowFiles Out (5 mins)",
+ "The number of FlowFiles that were pulled from this Connection in the past 5 minutes",
+ Formatter.COUNT,
+ new ValueMapper<ConnectionStatus>() {
+ @Override
+ public Long getValue(final ConnectionStatus status) {
+ return Long.valueOf(status.getOutputCount());
+ }
+ })),
+
+ QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+ "queuedBytes",
+ "Queued Bytes",
+ "The number of Bytes queued in this Connection",
+ Formatter.DATA_SIZE,
+ new ValueMapper<ConnectionStatus>() {
+ @Override
+ public Long getValue(final ConnectionStatus status) {
+ return status.getQueuedBytes();
+ }
+ })),
+
+ QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+ "queuedCount",
+ "Queued Count",
+ "The number of FlowFiles queued in this Connection",
+ Formatter.COUNT,
+ new ValueMapper<ConnectionStatus>() {
+ @Override
+ public Long getValue(final ConnectionStatus status) {
+ return Long.valueOf(status.getQueuedCount());
+ }
+ }));
+
+
+ private MetricDescriptor<ConnectionStatus> descriptor;
+
+ private ConnectionStatusDescriptor(final MetricDescriptor<ConnectionStatus> descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ public String getField() {
+ return descriptor.getField();
+ }
+
+ public MetricDescriptor<ConnectionStatus> getDescriptor() {
+ return descriptor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
new file mode 100644
index 0000000..d5325d0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status.history;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
+
+public enum ProcessGroupStatusDescriptor {
+
+ BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)",
+ "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getBytesRead();
+ }
+ })),
+
+ BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)",
+ "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getBytesWritten();
+ }
+ })),
+
+ BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)",
+ "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes",
+ Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getBytesRead() + status.getBytesWritten();
+ }
+ })),
+
+ INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)",
+ "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
+ Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getInputContentSize();
+ }
+ })),
+
+ INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)",
+ "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
+ Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getInputCount().longValue();
+ }
+ })),
+
+ OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)",
+ "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
+ Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getOutputContentSize();
+ }
+ })),
+
+ OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)",
+ "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
+ Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getOutputCount().longValue();
+ }
+ })),
+
+ QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes",
+ "The cumulative size of all FlowFiles queued in all Connections of this Process Group",
+ Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getQueuedContentSize();
+ }
+ })),
+
+ QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count",
+ "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return status.getQueuedCount().longValue();
+ }
+ })),
+
+ TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)",
+ "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes",
+ Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() {
+ @Override
+ public Long getValue(final ProcessGroupStatus status) {
+ return calculateTaskMillis(status);
+ }
+ }));
+
+ private MetricDescriptor<ProcessGroupStatus> descriptor;
+
+ private ProcessGroupStatusDescriptor(final MetricDescriptor<ProcessGroupStatus> descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ public String getField() {
+ return descriptor.getField();
+ }
+
+ public MetricDescriptor<ProcessGroupStatus> getDescriptor() {
+ return descriptor;
+ }
+
+
+ private static long calculateTaskMillis(final ProcessGroupStatus status) {
+ long nanos = 0L;
+
+ for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
+ nanos += procStatus.getProcessingNanos();
+ }
+
+ for (final ProcessGroupStatus childStatus : status.getProcessGroupStatus()) {
+ nanos += calculateTaskMillis(childStatus);
+ }
+
+ return TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
new file mode 100644
index 0000000..89e8aa0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status.history;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
+
+public enum ProcessorStatusDescriptor {
+ BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>(
+ "bytesRead",
+ "Bytes Read (5 mins)",
+ "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes",
+ Formatter.DATA_SIZE,
+ new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return status.getBytesRead();
+ }
+ })),
+
+ BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>(
+ "bytesWritten",
+ "Bytes Written (5 mins)",
+ "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes",
+ Formatter.DATA_SIZE,
+ new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return status.getBytesWritten();
+ }
+ })),
+
+ BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>(
+ "bytesTransferred",
+ "Bytes Transferred (5 mins)",
+ "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes",
+ Formatter.DATA_SIZE,
+ new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return status.getBytesRead() + status.getBytesWritten();
+ }
+ })),
+
+ INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>(
+ "inputBytes",
+ "Bytes In (5 mins)",
+ "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes",
+ Formatter.DATA_SIZE,
+ new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return status.getInputBytes();
+ }
+ })),
+
+ INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
+ "inputCount",
+ "FlowFiles In (5 mins)",
+ "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes",
+ Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return Long.valueOf(status.getInputCount());
+ }
+ })),
+ OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>(
+ "outputBytes",
+ "Bytes Out (5 mins)",
+ "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes",
+ Formatter.DATA_SIZE,
+ new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return status.getOutputBytes();
+ }
+ })),
+
+ OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
+ "outputCount",
+ "FlowFiles Out (5 mins)",
+ "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes",
+ Formatter.COUNT,
+ new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return Long.valueOf(status.getOutputCount());
+ }
+ })),
+
+ TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
+ "taskCount",
+ "Tasks (5 mins)",
+ "The number of tasks that this Processor has completed in the past 5 minutes",
+ Formatter.COUNT,
+ new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return Long.valueOf(status.getInvocations());
+ }
+ })),
+
+ TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>(
+ "taskMillis",
+ "Total Task Duration (5 mins)",
+ "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes",
+ Formatter.DURATION,
+ new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS);
+ }
+ })),
+
+ FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>(
+ "flowFilesRemoved",
+ "FlowFiles Removed (5 mins)",
+ "The total number of FlowFiles removed by this Processor in the last 5 minutes",
+ Formatter.COUNT,
+ new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return Long.valueOf(status.getFlowFilesRemoved());
+ }
+ })),
+
+ AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>(
+ "averageLineageDuration",
+ "Average Lineage Duration (5 mins)",
+ "The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.",
+ Formatter.DURATION,
+ new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return status.getAverageLineageDuration(TimeUnit.MILLISECONDS);
+ }
+ }, new ValueReducer<StatusSnapshot, Long>() {
+ @Override
+ public Long reduce(final List<StatusSnapshot> values) {
+ long millis = 0L;
+ int count = 0;
+
+ for (final StatusSnapshot snapshot : values) {
+ final long removed = snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor()).longValue();
+ count += removed;
+
+ count += snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()).longValue();
+
+ final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
+ final long totalMillis = avgMillis * removed;
+ millis += totalMillis;
+ }
+
+ return count == 0 ? 0 : millis / count;
+ }
+ }
+ )),
+
+ AVERAGE_TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>(
+ "averageTaskMillis",
+ "Average Task Duration",
+ "The average duration it took this Processor to complete a task, as averaged over the past 5 minutes",
+ Formatter.DURATION,
+ new ValueMapper<ProcessorStatus>() {
+ @Override
+ public Long getValue(final ProcessorStatus status) {
+ return status.getInvocations() == 0 ? 0 : TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS) / status.getInvocations();
+ }
+ },
+ new ValueReducer<StatusSnapshot, Long>() {
+ @Override
+ public Long reduce(final List<StatusSnapshot> values) {
+ long procMillis = 0L;
+ int invocations = 0;
+
+ for (final StatusSnapshot snapshot : values) {
+ procMillis += snapshot.getStatusMetrics().get(TASK_MILLIS.getDescriptor()).longValue();
+ invocations += snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()).intValue();
+ }
+
+ if (invocations == 0) {
+ return 0L;
+ }
+
+ return procMillis / invocations;
+ }
+ }));
+
+ private MetricDescriptor<ProcessorStatus> descriptor;
+
+ private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ public String getField() {
+ return descriptor.getField();
+ }
+
+ public MetricDescriptor<ProcessorStatus> getDescriptor() {
+ return descriptor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
new file mode 100644
index 0000000..0499d65
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status.history;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
+
+public enum RemoteProcessGroupStatusDescriptor {
+ SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)",
+ "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return status.getSentContentSize();
+ }
+ })),
+
+ SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)",
+ "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return Long.valueOf(status.getSentCount().longValue());
+ }
+ })),
+
+ RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)",
+ "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return status.getReceivedContentSize();
+ }
+ })),
+
+ RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)",
+ "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return Long.valueOf(status.getReceivedCount().longValue());
+ }
+ })),
+
+ RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second",
+ "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
+ Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return Long.valueOf(status.getReceivedContentSize().longValue() / 300L);
+ }
+ })),
+
+ SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second",
+ "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return Long.valueOf(status.getSentContentSize().longValue() / 300L);
+ }
+ })),
+
+ TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second",
+ "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second",
+ Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L);
+ }
+ })),
+
+ AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>(
+ "averageLineageDuration",
+ "Average Lineage Duration (5 mins)",
+ "The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.",
+ Formatter.DURATION,
+ new ValueMapper<RemoteProcessGroupStatus>() {
+ @Override
+ public Long getValue(final RemoteProcessGroupStatus status) {
+ return status.getAverageLineageDuration(TimeUnit.MILLISECONDS);
+ }
+ }, new ValueReducer<StatusSnapshot, Long>() {
+ @Override
+ public Long reduce(final List<StatusSnapshot> values) {
+ long millis = 0L;
+ int count = 0;
+
+ for (final StatusSnapshot snapshot : values) {
+ final long sent = snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor()).longValue();
+ count += sent;
+
+ final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
+ final long totalMillis = avgMillis * sent;
+ millis += totalMillis;
+ }
+
+ return count == 0 ? 0 : millis / count;
+ }
+ }));
+
+ private final MetricDescriptor<RemoteProcessGroupStatus> descriptor;
+
+ private RemoteProcessGroupStatusDescriptor(final MetricDescriptor<RemoteProcessGroupStatus> descriptor) {
+ this.descriptor = descriptor;
+ }
+
+ public String getField() {
+ return descriptor.getField();
+ }
+
+ public MetricDescriptor<RemoteProcessGroupStatus> getDescriptor() {
+ return descriptor;
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
index 014b0a6..756e576 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
@@ -44,9 +44,9 @@ public class StatusHistoryUtil {
final StatusHistoryDTO dto = new StatusHistoryDTO();
dto.setGenerated(new Date());
- dto.setDetails(componentDetails);
+ dto.setComponentDetails(componentDetails);
dto.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(metricDescriptors));
- dto.setStatusSnapshots(snapshotDtos);
+ dto.setAggregateSnapshots(snapshotDtos);
return dto;
}