You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2016/04/04 18:28:46 UTC

[10/18] nifi git commit: NIFI-1563: - Federate requests and merge responses from nodes instead of storing bulletins and stats at NCM - Updating UI to support restructured status history DTO. - Return 'Insufficient History' message if aggregate stats don'

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
deleted file mode 100644
index 1195bc9..0000000
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/Heartbeater.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.controller;
-
-public interface Heartbeater {
-
-    void heartbeat();
-}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
index e91ba9a..0d7f3ff 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/service/ControllerServiceNode.java
@@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService;
 
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.Heartbeater;
 
 public interface ControllerServiceNode extends ConfiguredComponent {
 
@@ -64,10 +63,8 @@ public interface ControllerServiceNode extends ConfiguredComponent {
      *            initiate service enabling task as well as its re-tries
      * @param administrativeYieldMillis
      *            the amount of milliseconds to wait for administrative yield
-     * @param heartbeater
-     *            the instance of {@link Heartbeater}
      */
-    void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis, Heartbeater heartbeater);
+    void enable(ScheduledExecutorService scheduler, long administrativeYieldMillis);
 
     /**
      * Will disable this service. Disabling of the service typically means
@@ -76,10 +73,8 @@ public interface ControllerServiceNode extends ConfiguredComponent {
      * @param scheduler
      *            implementation of {@link ScheduledExecutorService} used to
      *            initiate service disabling task
-     * @param heartbeater
-     *            the instance of {@link Heartbeater}
      */
-    void disable(ScheduledExecutorService scheduler, Heartbeater heartbeater);
+    void disable(ScheduledExecutorService scheduler);
 
     /**
      * @return the ControllerServiceReference that describes which components are referencing this Controller Service
@@ -139,12 +134,12 @@ public interface ControllerServiceNode extends ConfiguredComponent {
     /**
      * Returns 'true' if this service is active. The service is considered to be
      * active if and only if it's
-     * {@link #enable(ScheduledExecutorService, long, Heartbeater)} operation
+     * {@link #enable(ScheduledExecutorService, long)} operation
      * has been invoked and the service has been transitioned to ENABLING state.
      * The service will also remain 'active' after its been transitioned to
      * ENABLED state. <br>
      * The service will be de-activated upon invocation of
-     * {@link #disable(ScheduledExecutorService, Heartbeater)}.
+     * {@link #disable(ScheduledExecutorService)}.
      */
     boolean isActive();
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
index ff3ad4e..1146a39 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/cluster/HeartbeatPayload.java
@@ -20,18 +20,14 @@ import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
-import java.util.List;
+
 import javax.xml.bind.JAXBContext;
 import javax.xml.bind.JAXBException;
 import javax.xml.bind.Marshaller;
 import javax.xml.bind.Unmarshaller;
 import javax.xml.bind.annotation.XmlRootElement;
-import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+
 import org.apache.nifi.cluster.protocol.ProtocolException;
-import org.apache.nifi.controller.Counter;
-import org.apache.nifi.controller.status.ProcessGroupStatus;
-import org.apache.nifi.diagnostics.SystemDiagnostics;
-import org.apache.nifi.jaxb.CounterAdapter;
 
 /**
  * The payload of the heartbeat. The payload contains status to inform the cluster manager the current workload of this node.
@@ -50,23 +46,11 @@ public class HeartbeatPayload {
         }
     }
 
-    private List<Counter> counters;
-    private ProcessGroupStatus processGroupStatus;
     private int activeThreadCount;
     private long totalFlowFileCount;
     private long totalFlowFileBytes;
-    private SystemDiagnostics systemDiagnostics;
     private long systemStartTime;
 
-    @XmlJavaTypeAdapter(CounterAdapter.class)
-    public List<Counter> getCounters() {
-        return counters;
-    }
-
-    public void setCounters(final List<Counter> counters) {
-        this.counters = counters;
-    }
-
     public int getActiveThreadCount() {
         return activeThreadCount;
     }
@@ -91,22 +75,6 @@ public class HeartbeatPayload {
         this.totalFlowFileBytes = totalFlowFileBytes;
     }
 
-    public ProcessGroupStatus getProcessGroupStatus() {
-        return processGroupStatus;
-    }
-
-    public void setProcessGroupStatus(final ProcessGroupStatus processGroupStatus) {
-        this.processGroupStatus = processGroupStatus;
-    }
-
-    public SystemDiagnostics getSystemDiagnostics() {
-        return systemDiagnostics;
-    }
-
-    public void setSystemDiagnostics(final SystemDiagnostics systemDiagnostics) {
-        this.systemDiagnostics = systemDiagnostics;
-    }
-
     public long getSystemStartTime() {
         return systemStartTime;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
index 1ef18c0..d43a3db 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/connectable/StandardConnection.java
@@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
-import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.StandardFlowFileQueue;
 import org.apache.nifi.controller.queue.FlowFileQueue;
@@ -71,7 +70,7 @@ public final class StandardConnection implements Connection {
         relationships = new AtomicReference<>(Collections.unmodifiableCollection(builder.relationships));
         scheduler = builder.scheduler;
         flowFileQueue = new StandardFlowFileQueue(id, this, builder.flowFileRepository, builder.provenanceRepository, builder.resourceClaimManager,
-            scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold(), builder.heartbeater);
+            scheduler, builder.swapManager, builder.eventReporter, NiFiProperties.getInstance().getQueueSwapThreshold());
         hashCode = new HashCodeBuilder(7, 67).append(id).toHashCode();
     }
 
@@ -270,7 +269,6 @@ public final class StandardConnection implements Connection {
         private FlowFileRepository flowFileRepository;
         private ProvenanceEventRepository provenanceRepository;
         private ResourceClaimManager resourceClaimManager;
-        private Heartbeater heartbeater;
 
         public Builder(final ProcessScheduler scheduler) {
             this.scheduler = scheduler;
@@ -306,11 +304,6 @@ public final class StandardConnection implements Connection {
             return this;
         }
 
-        public Builder heartbeater(final Heartbeater heartbeater) {
-            this.heartbeater = heartbeater;
-            return this;
-        }
-
         public Builder bendPoints(final List<Position> bendPoints) {
             this.bendPoints.clear();
             this.bendPoints.addAll(bendPoints);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 09c4da6..632fa1a 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -16,7 +16,40 @@
  */
 package org.apache.nifi.controller;
 
-import com.sun.jersey.api.client.ClientHandlerException;
+import static java.util.Objects.requireNonNull;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.LockSupport;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import javax.net.ssl.SSLContext;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.action.Action;
 import org.apache.nifi.admin.service.AuditService;
@@ -27,16 +60,13 @@ import org.apache.nifi.annotation.lifecycle.OnRemoved;
 import org.apache.nifi.annotation.lifecycle.OnShutdown;
 import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
 import org.apache.nifi.annotation.notification.PrimaryNodeState;
-import org.apache.nifi.cluster.BulletinsPayload;
 import org.apache.nifi.cluster.HeartbeatPayload;
 import org.apache.nifi.cluster.protocol.DataFlow;
 import org.apache.nifi.cluster.protocol.Heartbeat;
-import org.apache.nifi.cluster.protocol.NodeBulletins;
 import org.apache.nifi.cluster.protocol.NodeIdentifier;
 import org.apache.nifi.cluster.protocol.NodeProtocolSender;
 import org.apache.nifi.cluster.protocol.UnknownServiceAddressException;
 import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
-import org.apache.nifi.cluster.protocol.message.NodeBulletinsMessage;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.state.StateManagerProvider;
 import org.apache.nifi.connectable.Connectable;
@@ -109,7 +139,6 @@ import org.apache.nifi.encrypt.StringEncryptor;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.BulletinFactory;
 import org.apache.nifi.events.EventReporter;
-import org.apache.nifi.events.NodeBulletinProcessingStrategy;
 import org.apache.nifi.events.VolatileBulletinRepository;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
@@ -184,41 +213,9 @@ import org.apache.zookeeper.server.quorum.QuorumPeerConfig.ConfigException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.net.ssl.SSLContext;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.LockSupport;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import static java.util.Objects.requireNonNull;
+import com.sun.jersey.api.client.ClientHandlerException;
 
-public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, Heartbeater, QueueProvider {
+public class FlowController implements EventAccess, ControllerServiceProvider, ReportingTaskProvider, QueueProvider {
 
     // default repository implementations
     public static final String DEFAULT_FLOWFILE_REPO_IMPLEMENTATION = "org.apache.nifi.controller.repository.WriteAheadFlowFileRepository";
@@ -308,7 +305,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     /**
      * timer to periodically send heartbeats to the cluster
      */
-    private ScheduledFuture<?> bulletinFuture;
     private ScheduledFuture<?> heartbeatGeneratorFuture;
     private ScheduledFuture<?> heartbeatSenderFuture;
 
@@ -318,8 +314,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
      */
     private final AtomicReference<HeartbeatMessageGeneratorTask> heartbeatMessageGeneratorTaskRef = new AtomicReference<>(null);
 
-    private final AtomicReference<NodeBulletinProcessingStrategy> nodeBulletinSubscriber;
-
     // guarded by rwLock
     /**
      * the node identifier;
@@ -420,7 +414,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         counterRepositoryRef = new AtomicReference<CounterRepository>(new StandardCounterRepository());
 
         bulletinRepository = new VolatileBulletinRepository();
-        nodeBulletinSubscriber = new AtomicReference<>();
 
         try {
             this.provenanceEventRepository = createProvenanceRepository(properties);
@@ -437,7 +430,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             throw new RuntimeException(e);
         }
 
-        processScheduler = new StandardProcessScheduler(this, this, encryptor, stateManagerProvider);
+        processScheduler = new StandardProcessScheduler(this, encryptor, stateManagerProvider);
         eventDrivenWorkerQueue = new EventDrivenWorkerQueue(false, false, processScheduler);
         controllerServiceProvider = new StandardControllerServiceProvider(processScheduler, bulletinRepository, stateManagerProvider);
 
@@ -833,7 +826,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             .resourceClaimManager(resourceClaimManager)
             .flowFileRepository(flowFileRepository)
             .provenanceRepository(provenanceEventRepository)
-            .heartbeater(this)
             .build();
     }
 
@@ -2122,7 +2114,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         final ProcessGroupStatus status = new ProcessGroupStatus();
         status.setId(group.getIdentifier());
         status.setName(group.getName());
-        status.setCreationTimestamp(new Date().getTime());
         int activeGroupThreads = 0;
         long bytesRead = 0L;
         long bytesWritten = 0L;
@@ -2899,7 +2890,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
     public Counter resetCounter(final String identifier) {
         final CounterRepository counterRepo = counterRepositoryRef.get();
         final Counter resetValue = counterRepo.resetCounter(identifier);
-        heartbeat();
         return resetValue;
     }
 
@@ -2955,8 +2945,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
 
             stopHeartbeating();
 
-            bulletinFuture = clusterTaskExecutor.scheduleWithFixedDelay(new BulletinsTask(protocolSender), 250, 2000, TimeUnit.MILLISECONDS);
-
             final HeartbeatMessageGeneratorTask heartbeatMessageGeneratorTask = new HeartbeatMessageGeneratorTask();
             heartbeatMessageGeneratorTaskRef.set(heartbeatMessageGeneratorTask);
             heartbeatGeneratorFuture = clusterTaskExecutor.scheduleWithFixedDelay(heartbeatMessageGeneratorTask, 0, heartbeatDelaySeconds, TimeUnit.SECONDS);
@@ -3007,10 +2995,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             if (heartbeatSenderFuture != null) {
                 heartbeatSenderFuture.cancel(false);
             }
-
-            if (bulletinFuture != null) {
-                bulletinFuture.cancel(false);
-            }
         } finally {
             writeLock.unlock();
         }
@@ -3135,8 +3119,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
             // update the bulletin repository
             if (isChanging) {
                 if (clustered) {
-                    nodeBulletinSubscriber.set(new NodeBulletinProcessingStrategy());
-                    bulletinRepository.overrideDefaultBulletinProcessing(nodeBulletinSubscriber.get());
                     stateManagerProvider.enableClusterProvider();
 
                     if (zooKeeperStateServer != null) {
@@ -3175,7 +3157,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                         LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1L));
                     }
                 } else {
-                    bulletinRepository.restoreDefaultBulletinProcessing();
                     if (zooKeeperStateServer != null) {
                         zooKeeperStateServer.shutdown();
                     }
@@ -3487,6 +3468,7 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         return replayFlowFile(record, requestor);
     }
 
+    @SuppressWarnings("deprecation")
     public ProvenanceEventRecord replayFlowFile(final ProvenanceEventRecord event, final String requestor) throws IOException {
         if (event == null) {
             throw new NullPointerException();
@@ -3627,7 +3609,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
-    @Override
     public void heartbeat() {
         if (!isClustered()) {
             return;
@@ -3642,110 +3623,6 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
         }
     }
 
-    private class BulletinsTask implements Runnable {
-
-        private final NodeProtocolSender protocolSender;
-        private final DateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS", Locale.US);
-
-        public BulletinsTask(final NodeProtocolSender protocolSender) {
-            if (protocolSender == null) {
-                throw new IllegalArgumentException("NodeProtocolSender may not be null.");
-            }
-            this.protocolSender = protocolSender;
-        }
-
-        @Override
-        public void run() {
-            try {
-                final NodeBulletinsMessage message = createBulletinsMessage();
-                if (message == null) {
-                    return;
-                }
-
-                protocolSender.sendBulletins(message);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(
-                        String.format(
-                            "Sending bulletins to cluster manager at %s",
-                            dateFormatter.format(new Date())));
-                }
-
-            } catch (final UnknownServiceAddressException usae) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug(usae.getMessage());
-                }
-            } catch (final Exception ex) {
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Failed to send bulletins to cluster manager due to: " + ex, ex);
-                }
-            }
-        }
-
-        private boolean isIllegalXmlChar(final char c) {
-            return c < 0x20 && c != 0x09 && c != 0x0A && c != 0x0D;
-        }
-
-        private boolean containsIllegalXmlChars(final Bulletin bulletin) {
-            final String message = bulletin.getMessage();
-            for (int i = 0; i < message.length(); i++) {
-                final char c = message.charAt(i);
-                if (isIllegalXmlChar(c)) {
-                    return true;
-                }
-            }
-
-            return false;
-        }
-
-        private String stripIllegalXmlChars(final String value) {
-            final StringBuilder sb = new StringBuilder(value.length());
-            for (int i = 0; i < value.length(); i++) {
-                final char c = value.charAt(i);
-                sb.append(isIllegalXmlChar(c) ? '?' : c);
-            }
-
-            return sb.toString();
-        }
-
-        private NodeBulletinsMessage createBulletinsMessage() {
-            final Set<Bulletin> nodeBulletins = nodeBulletinSubscriber.get().getBulletins();
-            final Set<Bulletin> escapedNodeBulletins = new HashSet<>(nodeBulletins.size());
-
-            // ensure there are some bulletins to report
-            if (nodeBulletins.isEmpty()) {
-                return null;
-            }
-
-            for (final Bulletin bulletin : nodeBulletins) {
-                final Bulletin escapedBulletin;
-                if (containsIllegalXmlChars(bulletin)) {
-                    final String escapedBulletinMessage = stripIllegalXmlChars(bulletin.getMessage());
-
-                    if (bulletin.getGroupId() == null) {
-                        escapedBulletin = BulletinFactory.createBulletin(bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
-                    } else {
-                        escapedBulletin = BulletinFactory.createBulletin(bulletin.getGroupId(), bulletin.getSourceId(), bulletin.getSourceType(),
-                            bulletin.getSourceName(), bulletin.getCategory(), bulletin.getLevel(), escapedBulletinMessage);
-                    }
-                } else {
-                    escapedBulletin = bulletin;
-                }
-
-                escapedNodeBulletins.add(escapedBulletin);
-            }
-
-            // create the bulletin payload
-            final BulletinsPayload payload = new BulletinsPayload();
-            payload.setBulletins(escapedNodeBulletins);
-
-            // create bulletin message
-            final NodeBulletins bulletins = new NodeBulletins(getNodeId(), payload.marshal());
-            final NodeBulletinsMessage message = new NodeBulletinsMessage();
-            message.setBulletins(bulletins);
-
-            return message;
-        }
-    }
 
     private class HeartbeatSendTask implements Runnable {
 
@@ -3822,20 +3699,15 @@ public class FlowController implements EventAccess, ControllerServiceProvider, R
                     return null;
                 }
 
-                final ProcessGroupStatus procGroupStatus = getGroupStatus(bean.getRootGroup(), getProcessorStats());
                 // create heartbeat payload
                 final HeartbeatPayload hbPayload = new HeartbeatPayload();
                 hbPayload.setSystemStartTime(systemStartTime);
-                hbPayload.setActiveThreadCount(procGroupStatus.getActiveThreadCount());
+                hbPayload.setActiveThreadCount(getActiveThreadCount());
 
                 final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
                 hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
                 hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
 
-                hbPayload.setCounters(getCounters());
-                hbPayload.setSystemDiagnostics(getSystemDiagnostics());
-                hbPayload.setProcessGroupStatus(procGroupStatus);
-
                 // create heartbeat message
                 final Heartbeat heartbeat = new Heartbeat(getNodeId(), bean.isPrimary(), bean.isConnected(), hbPayload.marshal());
                 final HeartbeatMessage message = new HeartbeatMessage();

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
index 0f3ffe0..6735959 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/StandardFlowFileQueue.java
@@ -111,7 +111,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     private final FlowFileRepository flowFileRepository;
     private final ProvenanceEventRepository provRepository;
     private final ResourceClaimManager resourceClaimManager;
-    private final Heartbeater heartbeater;
 
     private final ConcurrentMap<String, DropFlowFileRequest> dropRequestMap = new ConcurrentHashMap<>();
     private final ConcurrentMap<String, ListFlowFileRequest> listRequestMap = new ConcurrentHashMap<>();
@@ -120,8 +119,7 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
     private final ProcessScheduler scheduler;
 
     public StandardFlowFileQueue(final String identifier, final Connection connection, final FlowFileRepository flowFileRepo, final ProvenanceEventRepository provRepo,
-        final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold,
-        final Heartbeater heartbeater) {
+        final ResourceClaimManager resourceClaimManager, final ProcessScheduler scheduler, final FlowFileSwapManager swapManager, final EventReporter eventReporter, final int swapThreshold) {
         activeQueue = new PriorityQueue<>(20, new Prioritizer(new ArrayList<FlowFilePrioritizer>()));
         priorities = new ArrayList<>();
         swapQueue = new ArrayList<>();
@@ -135,7 +133,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
         this.swapThreshold = swapThreshold;
         this.scheduler = scheduler;
         this.connection = connection;
-        this.heartbeater = heartbeater;
 
         readLock = new TimedLock(this.lock.readLock(), identifier + " Read Lock", 100);
         writeLock = new TimedLock(this.lock.writeLock(), identifier + " Write Lock", 100);
@@ -1182,9 +1179,6 @@ public final class StandardFlowFileQueue implements FlowFileQueue {
                         logger.info("Successfully dropped {} FlowFiles ({} bytes) from Connection with ID {} on behalf of {}",
                             dropRequest.getDroppedSize().getObjectCount(), dropRequest.getDroppedSize().getByteCount(), StandardFlowFileQueue.this.getIdentifier(), requestor);
                         dropRequest.setState(DropFlowFileState.COMPLETE);
-                        if (heartbeater != null) {
-                            heartbeater.heartbeat();
-                        }
                     } catch (final Exception e) {
                         logger.error("Failed to drop FlowFiles from Connection with ID {} due to {}", StandardFlowFileQueue.this.getIdentifier(), e.toString());
                         logger.error("", e);

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
index f7e968e..3e4d3a3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java
@@ -37,7 +37,6 @@ import org.apache.nifi.connectable.Funnel;
 import org.apache.nifi.connectable.Port;
 import org.apache.nifi.controller.AbstractPort;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ReportingTaskNode;
@@ -70,7 +69,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
     private static final Logger LOG = LoggerFactory.getLogger(StandardProcessScheduler.class);
 
     private final ControllerServiceProvider controllerServiceProvider;
-    private final Heartbeater heartbeater;
     private final long administrativeYieldMillis;
     private final String administrativeYieldDuration;
     private final StateManagerProvider stateManagerProvider;
@@ -85,9 +83,8 @@ public final class StandardProcessScheduler implements ProcessScheduler {
 
     private final StringEncryptor encryptor;
 
-    public StandardProcessScheduler(final Heartbeater heartbeater, final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor,
+    public StandardProcessScheduler(final ControllerServiceProvider controllerServiceProvider, final StringEncryptor encryptor,
         final StateManagerProvider stateManagerProvider) {
-        this.heartbeater = heartbeater;
         this.controllerServiceProvider = controllerServiceProvider;
         this.encryptor = encryptor;
         this.stateManagerProvider = stateManagerProvider;
@@ -303,7 +300,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
             @Override
             public void trigger() {
                 getSchedulingAgent(procNode).schedule(procNode, scheduleState);
-                heartbeater.heartbeat();
             }
 
             @Override
@@ -441,7 +437,6 @@ public final class StandardProcessScheduler implements ProcessScheduler {
             final ConnectableProcessContext processContext = new ConnectableProcessContext(connectable, encryptor, getStateManager(connectable.getIdentifier()));
             try (final NarCloseable x = NarCloseable.withNarLoader()) {
                 ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnStopped.class, connectable, processContext);
-                heartbeater.heartbeat();
             }
         }
     }
@@ -541,12 +536,12 @@ public final class StandardProcessScheduler implements ProcessScheduler {
 
     @Override
     public void enableControllerService(final ControllerServiceNode service) {
-        service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis, this.heartbeater);
+        service.enable(this.componentLifeCycleThreadPool, this.administrativeYieldMillis);
     }
 
     @Override
     public void disableControllerService(final ControllerServiceNode service) {
-        service.disable(this.componentLifeCycleThreadPool, this.heartbeater);
+        service.disable(this.componentLifeCycleThreadPool);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
index 3f24ff1..bed6a35 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/StandardControllerServiceNode.java
@@ -36,7 +36,6 @@ import org.apache.nifi.controller.AbstractConfiguredComponent;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.ConfiguredComponent;
 import org.apache.nifi.controller.ControllerService;
-import org.apache.nifi.controller.Heartbeater;
 import org.apache.nifi.controller.ValidationContextFactory;
 import org.apache.nifi.controller.annotation.OnConfigured;
 import org.apache.nifi.controller.exception.ComponentLifeCycleException;
@@ -273,8 +272,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
      * as it reached ENABLED state.
      */
     @Override
-    public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis,
-            final Heartbeater heartbeater) {
+    public void enable(final ScheduledExecutorService scheduler, final long administrativeYieldMillis) {
         if (this.stateRef.compareAndSet(ControllerServiceState.DISABLED, ControllerServiceState.ENABLING)) {
             this.active.set(true);
             final ConfigurationContext configContext = new StandardConfigurationContext(this, this.serviceProvider, null);
@@ -287,13 +285,11 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
                         synchronized (active) {
                             shouldEnable = active.get() && stateRef.compareAndSet(ControllerServiceState.ENABLING, ControllerServiceState.ENABLED);
                         }
-                        if (shouldEnable) {
-                            heartbeater.heartbeat();
-                        } else {
+                        if (!shouldEnable) {
                             LOG.debug("Disabling service " + this + " after it has been enabled due to disable action being initiated.");
                             // Can only happen if user initiated DISABLE operation before service finished enabling. It's state will be
                             // set to DISABLING (see disable() operation)
-                            invokeDisable(configContext, heartbeater);
+                            invokeDisable(configContext);
                             stateRef.set(ControllerServiceState.DISABLED);
                         }
                     } catch (Exception e) {
@@ -301,7 +297,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
                         final ComponentLog componentLog = new SimpleProcessLogger(getIdentifier(), StandardControllerServiceNode.this);
                         componentLog.error("Failed to invoke @OnEnabled method due to {}", cause);
                         LOG.error("Failed to invoke @OnEnabled method of {} due to {}", getControllerServiceImplementation(), cause.toString());
-                        invokeDisable(configContext, heartbeater);
+                        invokeDisable(configContext);
 
                         if (isActive()) {
                             scheduler.schedule(this, administrativeYieldMillis, TimeUnit.MILLISECONDS);
@@ -323,14 +319,14 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
      * If such transition doesn't succeed (the service is still in ENABLING state)
      * then the service will still be transitioned to DISABLING state to ensure that
      * no other transition could happen on this service. However in such event
-     * (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long, Heartbeater)}
-     * operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long, Heartbeater)}
+     * (e.g., its @OnEnabled finally succeeded), the {@link #enable(ScheduledExecutorService, long)}
+     * operation will initiate service disabling javadoc for (see {@link #enable(ScheduledExecutorService, long)}
      * <br>
      * Upon successful invocation of @OnDisabled this service will be transitioned to
      * DISABLED state.
      */
     @Override
-    public void disable(ScheduledExecutorService scheduler, final Heartbeater heartbeater) {
+    public void disable(ScheduledExecutorService scheduler) {
         /*
          * The reason for synchronization is to ensure consistency of the
          * service state when another thread is in the middle of enabling this
@@ -347,10 +343,9 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
                 @Override
                 public void run() {
                     try {
-                        invokeDisable(configContext, heartbeater);
+                        invokeDisable(configContext);
                     } finally {
                         stateRef.set(ControllerServiceState.DISABLED);
-                        heartbeater.heartbeat();
                     }
                 }
             });
@@ -362,7 +357,7 @@ public class StandardControllerServiceNode extends AbstractConfiguredComponent i
     /**
      *
      */
-    private void invokeDisable(ConfigurationContext configContext, Heartbeater heartbeater) {
+    private void invokeDisable(ConfigurationContext configContext) {
         try {
             ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, StandardControllerServiceNode.this.getControllerServiceImplementation(), configContext);
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
new file mode 100644
index 0000000..8b9f383
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ConnectionStatusDescriptor.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status.history;
+
+import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
+
+public enum ConnectionStatusDescriptor {
+    INPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+        "inputBytes",
+        "Bytes In (5 mins)",
+        "The cumulative size of all FlowFiles that were transferred to this Connection in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ConnectionStatus>() {
+            @Override
+            public Long getValue(final ConnectionStatus status) {
+                return status.getInputBytes();
+            }
+        })),
+
+    INPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+        "inputCount",
+        "FlowFiles In (5 mins)",
+        "The number of FlowFiles that were transferred to this Connection in the past 5 minutes",
+        Formatter.COUNT,
+        new ValueMapper<ConnectionStatus>() {
+            @Override
+            public Long getValue(final ConnectionStatus status) {
+                return Long.valueOf(status.getInputCount());
+            }
+        })),
+
+    OUTPUT_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+        "outputBytes",
+        "Bytes Out (5 mins)",
+        "The cumulative size of all FlowFiles that were pulled from this Connection in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ConnectionStatus>() {
+            @Override
+            public Long getValue(final ConnectionStatus status) {
+                return status.getOutputBytes();
+            }
+        })),
+
+    OUTPUT_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+        "outputCount",
+        "FlowFiles Out (5 mins)",
+        "The number of FlowFiles that were pulled from this Connection in the past 5 minutes",
+        Formatter.COUNT,
+        new ValueMapper<ConnectionStatus>() {
+            @Override
+            public Long getValue(final ConnectionStatus status) {
+                return Long.valueOf(status.getOutputCount());
+            }
+        })),
+
+    QUEUED_BYTES(new StandardMetricDescriptor<ConnectionStatus>(
+        "queuedBytes",
+        "Queued Bytes",
+        "The number of Bytes queued in this Connection",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ConnectionStatus>() {
+            @Override
+            public Long getValue(final ConnectionStatus status) {
+                return status.getQueuedBytes();
+            }
+        })),
+
+    QUEUED_COUNT(new StandardMetricDescriptor<ConnectionStatus>(
+        "queuedCount",
+        "Queued Count",
+        "The number of FlowFiles queued in this Connection",
+        Formatter.COUNT,
+        new ValueMapper<ConnectionStatus>() {
+            @Override
+            public Long getValue(final ConnectionStatus status) {
+                return Long.valueOf(status.getQueuedCount());
+            }
+        }));
+
+
+    private MetricDescriptor<ConnectionStatus> descriptor;
+
+    private ConnectionStatusDescriptor(final MetricDescriptor<ConnectionStatus> descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    public String getField() {
+        return descriptor.getField();
+    }
+
+    public MetricDescriptor<ConnectionStatus> getDescriptor() {
+        return descriptor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
new file mode 100644
index 0000000..d5325d0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessGroupStatusDescriptor.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status.history;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
+
+public enum ProcessGroupStatusDescriptor {
+
+    BYTES_READ(new StandardMetricDescriptor<ProcessGroupStatus>("bytesRead", "Bytes Read (5 mins)",
+        "The total number of bytes read from Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getBytesRead();
+            }
+        })),
+
+    BYTES_WRITTEN(new StandardMetricDescriptor<ProcessGroupStatus>("bytesWritten", "Bytes Written (5 mins)",
+        "The total number of bytes written to Content Repository by Processors in this Process Group in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getBytesWritten();
+            }
+        })),
+
+    BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessGroupStatus>("bytesTransferred", "Bytes Transferred (5 mins)",
+        "The total number of bytes read from or written to Content Repository by Processors in this Process Group in the past 5 minutes",
+        Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getBytesRead() + status.getBytesWritten();
+            }
+        })),
+
+    INPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("inputBytes", "Bytes In (5 mins)",
+        "The cumulative size of all FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
+        Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getInputContentSize();
+            }
+        })),
+
+    INPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("inputCount", "FlowFiles In (5 mins)",
+        "The number of FlowFiles that have entered this Process Group via its Input Ports in the past 5 minutes",
+        Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getInputCount().longValue();
+            }
+        })),
+
+    OUTPUT_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("outputBytes", "Bytes Out (5 mins)",
+        "The cumulative size of all FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
+        Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getOutputContentSize();
+            }
+        })),
+
+    OUTPUT_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("outputCount", "FlowFiles Out (5 mins)",
+        "The number of FlowFiles that have exited this Process Group via its Output Ports in the past 5 minutes",
+        Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getOutputCount().longValue();
+            }
+        })),
+
+    QUEUED_BYTES(new StandardMetricDescriptor<ProcessGroupStatus>("queuedBytes", "Queued Bytes",
+        "The cumulative size of all FlowFiles queued in all Connections of this Process Group",
+        Formatter.DATA_SIZE, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getQueuedContentSize();
+            }
+        })),
+
+    QUEUED_COUNT(new StandardMetricDescriptor<ProcessGroupStatus>("queuedCount", "Queued Count",
+        "The number of FlowFiles queued in all Connections of this Process Group", Formatter.COUNT, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return status.getQueuedCount().longValue();
+            }
+        })),
+
+    TASK_MILLIS(new StandardMetricDescriptor<ProcessGroupStatus>("taskMillis", "Total Task Duration (5 mins)",
+        "The total number of thread-milliseconds that the Processors within this ProcessGroup have used to complete their tasks in the past 5 minutes",
+        Formatter.DURATION, new ValueMapper<ProcessGroupStatus>() {
+            @Override
+            public Long getValue(final ProcessGroupStatus status) {
+                return calculateTaskMillis(status);
+            }
+        }));
+
+    private MetricDescriptor<ProcessGroupStatus> descriptor;
+
+    private ProcessGroupStatusDescriptor(final MetricDescriptor<ProcessGroupStatus> descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    public String getField() {
+        return descriptor.getField();
+    }
+
+    public MetricDescriptor<ProcessGroupStatus> getDescriptor() {
+        return descriptor;
+    }
+
+
+    private static long calculateTaskMillis(final ProcessGroupStatus status) {
+        long nanos = 0L;
+
+        for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
+            nanos += procStatus.getProcessingNanos();
+        }
+
+        for (final ProcessGroupStatus childStatus : status.getProcessGroupStatus()) {
+            nanos += calculateTaskMillis(childStatus);
+        }
+
+        return TimeUnit.MILLISECONDS.convert(nanos, TimeUnit.NANOSECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
new file mode 100644
index 0000000..89e8aa0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/ProcessorStatusDescriptor.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status.history;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
+
+public enum ProcessorStatusDescriptor {
+    BYTES_READ(new StandardMetricDescriptor<ProcessorStatus>(
+        "bytesRead",
+        "Bytes Read (5 mins)",
+        "The total number of bytes read from the Content Repository by this Processor in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getBytesRead();
+            }
+        })),
+
+    BYTES_WRITTEN(new StandardMetricDescriptor<ProcessorStatus>(
+        "bytesWritten",
+        "Bytes Written (5 mins)",
+        "The total number of bytes written to the Content Repository by this Processor in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getBytesWritten();
+            }
+        })),
+
+    BYTES_TRANSFERRED(new StandardMetricDescriptor<ProcessorStatus>(
+        "bytesTransferred",
+        "Bytes Transferred (5 mins)",
+        "The total number of bytes read from or written to the Content Repository by this Processor in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getBytesRead() + status.getBytesWritten();
+            }
+        })),
+
+    INPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>(
+        "inputBytes",
+        "Bytes In (5 mins)",
+        "The cumulative size of all FlowFiles that this Processor has pulled from its queues in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getInputBytes();
+            }
+        })),
+
+    INPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
+        "inputCount",
+        "FlowFiles In (5 mins)",
+        "The number of FlowFiles that this Processor has pulled from its queues in the past 5 minutes",
+        Formatter.COUNT, new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return Long.valueOf(status.getInputCount());
+            }
+        })),
+    OUTPUT_BYTES(new StandardMetricDescriptor<ProcessorStatus>(
+        "outputBytes",
+        "Bytes Out (5 mins)",
+        "The cumulative size of all FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes",
+        Formatter.DATA_SIZE,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getOutputBytes();
+            }
+        })),
+
+    OUTPUT_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
+        "outputCount",
+        "FlowFiles Out (5 mins)",
+        "The number of FlowFiles that this Processor has transferred to downstream queues in the past 5 minutes",
+        Formatter.COUNT,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return Long.valueOf(status.getOutputCount());
+            }
+        })),
+
+    TASK_COUNT(new StandardMetricDescriptor<ProcessorStatus>(
+        "taskCount",
+        "Tasks (5 mins)",
+        "The number of tasks that this Processor has completed in the past 5 minutes",
+        Formatter.COUNT,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return Long.valueOf(status.getInvocations());
+            }
+        })),
+
+    TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>(
+        "taskMillis",
+        "Total Task Duration (5 mins)",
+        "The total number of thread-milliseconds that the Processor has used to complete its tasks in the past 5 minutes",
+        Formatter.DURATION,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS);
+            }
+        })),
+
+    FLOWFILES_REMOVED(new StandardMetricDescriptor<ProcessorStatus>(
+        "flowFilesRemoved",
+        "FlowFiles Removed (5 mins)",
+        "The total number of FlowFiles removed by this Processor in the last 5 minutes",
+        Formatter.COUNT,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return Long.valueOf(status.getFlowFilesRemoved());
+            }
+        })),
+
+    AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<ProcessorStatus>(
+        "averageLineageDuration",
+        "Average Lineage Duration (5 mins)",
+        "The average amount of time that a FlowFile took to process (from receipt until this Processor finished processing it) in the past 5 minutes.",
+        Formatter.DURATION,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getAverageLineageDuration(TimeUnit.MILLISECONDS);
+            }
+        }, new ValueReducer<StatusSnapshot, Long>() {
+            @Override
+            public Long reduce(final List<StatusSnapshot> values) {
+                long millis = 0L;
+                int count = 0;
+
+                for (final StatusSnapshot snapshot : values) {
+                    final long removed = snapshot.getStatusMetrics().get(FLOWFILES_REMOVED.getDescriptor()).longValue();
+                    count += removed;
+
+                    count += snapshot.getStatusMetrics().get(OUTPUT_COUNT.getDescriptor()).longValue();
+
+                    final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
+                    final long totalMillis = avgMillis * removed;
+                    millis += totalMillis;
+                }
+
+                return count == 0 ? 0 : millis / count;
+            }
+        }
+    )),
+
+    AVERAGE_TASK_MILLIS(new StandardMetricDescriptor<ProcessorStatus>(
+        "averageTaskMillis",
+        "Average Task Duration",
+        "The average duration it took this Processor to complete a task, as averaged over the past 5 minutes",
+        Formatter.DURATION,
+        new ValueMapper<ProcessorStatus>() {
+            @Override
+            public Long getValue(final ProcessorStatus status) {
+                return status.getInvocations() == 0 ? 0 : TimeUnit.MILLISECONDS.convert(status.getProcessingNanos(), TimeUnit.NANOSECONDS) / status.getInvocations();
+            }
+        },
+        new ValueReducer<StatusSnapshot, Long>() {
+            @Override
+            public Long reduce(final List<StatusSnapshot> values) {
+                long procMillis = 0L;
+                int invocations = 0;
+
+                for (final StatusSnapshot snapshot : values) {
+                    procMillis += snapshot.getStatusMetrics().get(TASK_MILLIS.getDescriptor()).longValue();
+                    invocations += snapshot.getStatusMetrics().get(TASK_COUNT.getDescriptor()).intValue();
+                }
+
+                if (invocations == 0) {
+                    return 0L;
+                }
+
+                return procMillis / invocations;
+            }
+        }));
+
+    private MetricDescriptor<ProcessorStatus> descriptor;
+
+    private ProcessorStatusDescriptor(final MetricDescriptor<ProcessorStatus> descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    public String getField() {
+        return descriptor.getField();
+    }
+
+    public MetricDescriptor<ProcessorStatus> getDescriptor() {
+        return descriptor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
new file mode 100644
index 0000000..0499d65
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/RemoteProcessGroupStatusDescriptor.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status.history;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.history.MetricDescriptor.Formatter;
+
+public enum RemoteProcessGroupStatusDescriptor {
+    SENT_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytes", "Bytes Sent (5 mins)",
+        "The cumulative size of all FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return status.getSentContentSize();
+            }
+        })),
+
+    SENT_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentCount", "FlowFiles Sent (5 mins)",
+        "The number of FlowFiles that have been successfully sent to the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return Long.valueOf(status.getSentCount().longValue());
+            }
+        })),
+
+    RECEIVED_BYTES(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytes", "Bytes Received (5 mins)",
+        "The cumulative size of all FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return status.getReceivedContentSize();
+            }
+        })),
+
+    RECEIVED_COUNT(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedCount", "FlowFiles Received (5 mins)",
+        "The number of FlowFiles that have been received from the remote system in the past 5 minutes", Formatter.COUNT, new ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return Long.valueOf(status.getReceivedCount().longValue());
+            }
+        })),
+
+    RECEIVED_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("receivedBytesPerSecond", "Received Bytes Per Second",
+        "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second",
+        Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return Long.valueOf(status.getReceivedContentSize().longValue() / 300L);
+            }
+        })),
+
+    SENT_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("sentBytesPerSecond", "Sent Bytes Per Second",
+        "The data rate at which data was received from the remote system in the past 5 minutes in terms of Bytes Per Second", Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return Long.valueOf(status.getSentContentSize().longValue() / 300L);
+            }
+        })),
+
+    TOTAL_BYTES_PER_SECOND(new StandardMetricDescriptor<RemoteProcessGroupStatus>("totalBytesPerSecond", "Total Bytes Per Second",
+        "The sum of the send and receive data rate from the remote system in the past 5 minutes in terms of Bytes Per Second",
+        Formatter.DATA_SIZE, new ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return Long.valueOf((status.getReceivedContentSize().longValue() + status.getSentContentSize().longValue()) / 300L);
+            }
+        })),
+
+    AVERAGE_LINEAGE_DURATION(new StandardMetricDescriptor<RemoteProcessGroupStatus>(
+        "averageLineageDuration",
+        "Average Lineage Duration (5 mins)",
+        "The average amount of time that a FlowFile took to process from receipt to drop in the past 5 minutes. For Processors that do not terminate FlowFiles, this value will be 0.",
+        Formatter.DURATION,
+        new ValueMapper<RemoteProcessGroupStatus>() {
+            @Override
+            public Long getValue(final RemoteProcessGroupStatus status) {
+                return status.getAverageLineageDuration(TimeUnit.MILLISECONDS);
+            }
+        }, new ValueReducer<StatusSnapshot, Long>() {
+            @Override
+            public Long reduce(final List<StatusSnapshot> values) {
+                long millis = 0L;
+                int count = 0;
+
+                for (final StatusSnapshot snapshot : values) {
+                    final long sent = snapshot.getStatusMetrics().get(SENT_COUNT.getDescriptor()).longValue();
+                    count += sent;
+
+                    final long avgMillis = snapshot.getStatusMetrics().get(AVERAGE_LINEAGE_DURATION.getDescriptor()).longValue();
+                    final long totalMillis = avgMillis * sent;
+                    millis += totalMillis;
+                }
+
+                return count == 0 ? 0 : millis / count;
+            }
+        }));
+
+    private final MetricDescriptor<RemoteProcessGroupStatus> descriptor;
+
+    private RemoteProcessGroupStatusDescriptor(final MetricDescriptor<RemoteProcessGroupStatus> descriptor) {
+        this.descriptor = descriptor;
+    }
+
+    public String getField() {
+        return descriptor.getField();
+    }
+
+    public MetricDescriptor<RemoteProcessGroupStatus> getDescriptor() {
+        return descriptor;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/0d3bd2c4/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
index 014b0a6..756e576 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryUtil.java
@@ -44,9 +44,9 @@ public class StatusHistoryUtil {
 
         final StatusHistoryDTO dto = new StatusHistoryDTO();
         dto.setGenerated(new Date());
-        dto.setDetails(componentDetails);
+        dto.setComponentDetails(componentDetails);
         dto.setFieldDescriptors(StatusHistoryUtil.createFieldDescriptorDtos(metricDescriptors));
-        dto.setStatusSnapshots(snapshotDtos);
+        dto.setAggregateSnapshots(snapshotDtos);
         return dto;
     }