You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/27 16:42:57 UTC

[nifi] 03/23: NIFI-6510 Initial analytics REST endpoint and supporting objects

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit e1f5296d049fcab1b94e1f6cc08f0fe57322f9f1
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Jul 15 11:56:55 2019 -0400

    NIFI-6510 Initial analytics REST endpoint and supporting objects
---
 .../status/analytics/StatusAnalytics.java          |  21 +++
 .../api/dto/status/ConnectionStatisticsDTO.java    | 161 +++++++++++++++++++++
 .../status/ConnectionStatisticsSnapshotDTO.java    | 149 +++++++++++++++++++
 .../NodeConnectionStatisticsSnapshotDTO.java       |  78 ++++++++++
 .../web/api/entity/ConnectionStatisticsEntity.java |  55 +++++++
 .../entity/ConnectionStatisticsSnapshotEntity.java |  76 ++++++++++
 .../org/apache/nifi/controller/FlowController.java |   3 +-
 .../status/analytics/StatusAnalyticEngine.java     |   3 +-
 .../org/apache/nifi/web/NiFiServiceFacade.java     |   9 ++
 .../apache/nifi/web/StandardNiFiServiceFacade.java |  10 ++
 .../java/org/apache/nifi/web/api/FlowResource.java |  74 ++++++++++
 .../org/apache/nifi/web/api/dto/DtoFactory.java    |  28 ++++
 .../org/apache/nifi/web/api/dto/EntityFactory.java |  19 +++
 .../nifi/web/controller/ControllerFacade.java      |  32 ++++
 14 files changed, 716 insertions(+), 2 deletions(-)

diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
new file mode 100644
index 0000000..d6ad3bc
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+public interface StatusAnalytics {
+    long getMinTimeToBackpressureMillis();
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsDTO.java
new file mode 100644
index 0000000..79ae947
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsDTO.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.dto.status;
+
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.web.api.dto.util.TimeAdapter;
+
+import javax.xml.bind.annotation.XmlType;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+@XmlType(name = "connectionStatistics")
+public class ConnectionStatisticsDTO implements Cloneable {
+    private String id;
+    private String groupId;
+    private String name;
+    private Date statsLastRefreshed;
+
+    private String sourceId;
+    private String sourceName;
+    private String destinationId;
+
+    private String destinationName;
+    private ConnectionStatisticsSnapshotDTO aggregateSnapshot;
+
+    private List<NodeConnectionStatisticsSnapshotDTO> nodeSnapshots;
+
+    @ApiModelProperty("The ID of the connection")
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    @ApiModelProperty("The ID of the Process Group that the connection belongs to")
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    @ApiModelProperty("The name of the connection")
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    @ApiModelProperty("The ID of the source component")
+    public String getSourceId() {
+        return sourceId;
+    }
+
+    public void setSourceId(String sourceId) {
+        this.sourceId = sourceId;
+    }
+
+    @ApiModelProperty("The name of the source component")
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public void setSourceName(String sourceName) {
+        this.sourceName = sourceName;
+    }
+
+    @ApiModelProperty("The ID of the destination component")
+    public String getDestinationId() {
+        return destinationId;
+    }
+
+    public void setDestinationId(String destinationId) {
+        this.destinationId = destinationId;
+    }
+
+    @ApiModelProperty("The name of the destination component")
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public void setDestinationName(String destinationName) {
+        this.destinationName = destinationName;
+    }
+
+    @ApiModelProperty("The status snapshot that represents the aggregate stats of the cluster")
+    public ConnectionStatisticsSnapshotDTO getAggregateSnapshot() {
+        return aggregateSnapshot;
+    }
+
+    public void setAggregateSnapshot(ConnectionStatisticsSnapshotDTO aggregateSnapshot) {
+        this.aggregateSnapshot = aggregateSnapshot;
+    }
+
+    @ApiModelProperty("A list of status snapshots for each node")
+    public List<NodeConnectionStatisticsSnapshotDTO> getNodeSnapshots() {
+        return nodeSnapshots;
+    }
+
+    public void setNodeSnapshots(List<NodeConnectionStatisticsSnapshotDTO> nodeSnapshots) {
+        this.nodeSnapshots = nodeSnapshots;
+    }
+
+    @XmlJavaTypeAdapter(TimeAdapter.class)
+    @ApiModelProperty(
+            value = "The timestamp of when the stats were last refreshed",
+            dataType = "string"
+    )
+    public Date getStatsLastRefreshed() {
+        return statsLastRefreshed;
+    }
+
+    public void setStatsLastRefreshed(Date statsLastRefreshed) {
+        this.statsLastRefreshed = statsLastRefreshed;
+    }
+
+    @Override
+    public ConnectionStatisticsDTO clone() {
+        final ConnectionStatisticsDTO other = new ConnectionStatisticsDTO();
+        other.setDestinationId(getDestinationId());
+        other.setDestinationName(getDestinationName());
+        other.setGroupId(getGroupId());
+        other.setId(getId());
+        other.setName(getName());
+        other.setSourceId(getSourceId());
+        other.setSourceName(getSourceName());
+        other.setAggregateSnapshot(getAggregateSnapshot().clone());
+
+
+        final List<NodeConnectionStatisticsSnapshotDTO> nodeStatuses = getNodeSnapshots();
+        final List<NodeConnectionStatisticsSnapshotDTO> nodeStatusClones = new ArrayList<>(nodeStatuses.size());
+        for (final NodeConnectionStatisticsSnapshotDTO nodeStatus : nodeStatuses) {
+            nodeStatusClones.add(nodeStatus.clone());
+        }
+        other.setNodeSnapshots(nodeStatusClones);
+
+        return other;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
new file mode 100644
index 0000000..e914f74
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto.status;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * DTO for serializing the statistics of a connection.
+ */
+@XmlType(name = "connectionStatisticsSnapshot")
+public class ConnectionStatisticsSnapshotDTO implements Cloneable {
+
+    private String id;
+    private String groupId;
+    private String name;
+
+    private String sourceId;
+    private String sourceName;
+    private String destinationId;
+    private String destinationName;
+
+    private Long predictedMillisUntilBackpressure = 0L;
+
+    /* getters / setters */
+    /**
+     * @return The connection id
+     */
+    @ApiModelProperty("The id of the connection.")
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    /**
+     * @return the ID of the Process Group to which this connection belongs.
+     */
+    @ApiModelProperty("The id of the process group the connection belongs to.")
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(final String groupId) {
+        this.groupId = groupId;
+    }
+
+    /**
+     * @return name of this connection
+     */
+    @ApiModelProperty("The name of the connection.")
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * @return id of the source of this connection
+     */
+    @ApiModelProperty("The id of the source of the connection.")
+    public String getSourceId() {
+        return sourceId;
+    }
+
+    public void setSourceId(String sourceId) {
+        this.sourceId = sourceId;
+    }
+
+    /**
+     * @return name of the source of this connection
+     */
+    @ApiModelProperty("The name of the source of the connection.")
+    public String getSourceName() {
+        return sourceName;
+    }
+
+    public void setSourceName(String sourceName) {
+        this.sourceName = sourceName;
+    }
+
+    /**
+     * @return id of the destination of this connection
+     */
+    @ApiModelProperty("The id of the destination of the connection.")
+    public String getDestinationId() {
+        return destinationId;
+    }
+
+    public void setDestinationId(String destinationId) {
+        this.destinationId = destinationId;
+    }
+
+    /**
+     * @return name of the destination of this connection
+     */
+    @ApiModelProperty("The name of the destination of the connection.")
+    public String getDestinationName() {
+        return destinationName;
+    }
+
+    public void setDestinationName(String destinationName) {
+        this.destinationName = destinationName;
+    }
+
+    @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied.")
+    public Long getPredictedMillisUntilBackpressure() {
+        return predictedMillisUntilBackpressure;
+    }
+
+    public void setPredictedMillisUntilBackpressure(Long predictedMillisUntilBackpressure) {
+        this.predictedMillisUntilBackpressure = predictedMillisUntilBackpressure;
+    }
+
+    @Override
+    public ConnectionStatisticsSnapshotDTO clone() {
+        final ConnectionStatisticsSnapshotDTO other = new ConnectionStatisticsSnapshotDTO();
+        other.setDestinationId(getDestinationId());
+        other.setDestinationName(getDestinationName());
+        other.setGroupId(getGroupId());
+        other.setId(getId());
+        other.setName(getName());
+        other.setSourceId(getSourceId());
+        other.setSourceName(getSourceName());
+
+        other.setPredictedMillisUntilBackpressure(getPredictedMillisUntilBackpressure());
+
+        return other;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeConnectionStatisticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeConnectionStatisticsSnapshotDTO.java
new file mode 100644
index 0000000..76f94ec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeConnectionStatisticsSnapshotDTO.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.dto.status;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlType;
+
+@XmlType(name = "nodeConnectionStatisticsSnapshot")
+public class NodeConnectionStatisticsSnapshotDTO implements Cloneable {
+    private String nodeId;
+    private String address;
+    private Integer apiPort;
+
+    private ConnectionStatisticsSnapshotDTO statisticsSnapshot;
+
+    @ApiModelProperty("The unique ID that identifies the node")
+    public String getNodeId() {
+        return nodeId;
+    }
+
+    public void setNodeId(String nodeId) {
+        this.nodeId = nodeId;
+    }
+
+    @ApiModelProperty("The API address of the node")
+    public String getAddress() {
+        return address;
+    }
+
+    public void setAddress(String address) {
+        this.address = address;
+    }
+
+    @ApiModelProperty("The API port used to communicate with the node")
+    public Integer getApiPort() {
+        return apiPort;
+    }
+
+    public void setApiPort(Integer apiPort) {
+        this.apiPort = apiPort;
+    }
+
+    @ApiModelProperty("The connection status snapshot from the node.")
+    public ConnectionStatisticsSnapshotDTO getStatisticsSnapshot() {
+        return statisticsSnapshot;
+    }
+
+    public void setStatisticsSnapshot(ConnectionStatisticsSnapshotDTO statisticsSnapshot) {
+        this.statisticsSnapshot = statisticsSnapshot;
+    }
+
+    @Override
+    public NodeConnectionStatisticsSnapshotDTO clone() {
+        final NodeConnectionStatisticsSnapshotDTO other = new NodeConnectionStatisticsSnapshotDTO();
+        other.setNodeId(getNodeId());
+        other.setAddress(getAddress());
+        other.setApiPort(getApiPort());
+        other.setStatisticsSnapshot(getStatisticsSnapshot().clone());
+        return other;
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsEntity.java
new file mode 100644
index 0000000..781cff6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsEntity.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import org.apache.nifi.web.api.dto.ReadablePermission;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ConnectionStatisticsDTO.
+ */
+@XmlRootElement(name = "connectionStatisticsEntity")
+public class ConnectionStatisticsEntity extends Entity implements ReadablePermission {
+
+    private ConnectionStatisticsDTO connectionStatistics;
+    private Boolean canRead;
+
+    /**
+     * The ConnectionStatisticsDTO that is being serialized.
+     *
+     * @return The ConnectionStatisticsDTO object
+     */
+    public ConnectionStatisticsDTO getConnectionStatistics() {
+        return connectionStatistics;
+    }
+
+    public void setConnectionStatistics(ConnectionStatisticsDTO connectionStatistics) {
+        this.connectionStatistics = connectionStatistics;
+    }
+
+    @Override
+    public Boolean getCanRead() {
+        return canRead;
+    }
+
+    @Override
+    public void setCanRead(Boolean canRead) {
+        this.canRead = canRead;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java
new file mode 100644
index 0000000..da7e5ca
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.web.api.dto.ReadablePermission;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API.
+ * This particular entity holds a reference to a ConnectionStatisticsSnapshotDTO.
+ */
+public class ConnectionStatisticsSnapshotEntity extends Entity implements ReadablePermission, Cloneable {
+    private String id;
+    private ConnectionStatisticsSnapshotDTO connectionStatisticsSnapshot;
+    private Boolean canRead;
+
+    /**
+     * @return The connection id
+     */
+    @ApiModelProperty("The id of the connection.")
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    /**
+     * The ConnectionStatisticsSnapshotDTO that is being serialized.
+     *
+     * @return The ConnectionStatisticsSnapshotDTO object
+     */
+    public ConnectionStatisticsSnapshotDTO getConnectionStatisticsSnapshot() {
+        return connectionStatisticsSnapshot;
+    }
+
+    public void setConnectionStatisticsSnapshot(ConnectionStatisticsSnapshotDTO connectionStatusSnapshot) {
+        this.connectionStatisticsSnapshot = connectionStatusSnapshot;
+    }
+
+    @Override
+    public Boolean getCanRead() {
+        return canRead;
+    }
+
+    @Override
+    public void setCanRead(Boolean canRead) {
+        this.canRead = canRead;
+    }
+
+    @Override
+    public ConnectionStatisticsSnapshotEntity clone() {
+        final ConnectionStatisticsSnapshotEntity other = new ConnectionStatisticsSnapshotEntity();
+        other.setCanRead(this.getCanRead());
+        other.setConnectionStatisticsSnapshot(this.getConnectionStatisticsSnapshot().clone());
+
+        return other;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index f7ed734..0c422b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -118,6 +118,7 @@ import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
 import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
 import org.apache.nifi.controller.status.analytics.StatusAnalyticEngine;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
 import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
@@ -602,7 +603,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             }
         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
 
-        StatusAnalyticEngine analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository);
+        StatusAnalytics analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository);
 
         timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
             @Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
index 0602a93..9231707 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -31,7 +31,7 @@ import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class StatusAnalyticEngine {
+public class StatusAnalyticEngine implements StatusAnalytics {
     private ComponentStatusRepository statusRepository;
     private FlowController controller;
 
@@ -42,6 +42,7 @@ public class StatusAnalyticEngine {
         this.statusRepository = statusRepository;
     }
 
+    @Override
     public long getMinTimeToBackpressureMillis() {
         ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
         List<Connection> allConnections = rootGroup.findAllConnections();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index f62bec2..9ed8808 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -87,6 +87,7 @@ import org.apache.nifi.web.api.entity.BucketEntity;
 import org.apache.nifi.web.api.entity.BulletinEntity;
 import org.apache.nifi.web.api.entity.ComponentValidationResultEntity;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity;
 import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
 import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
 import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
@@ -662,6 +663,14 @@ public interface NiFiServiceFacade {
     StatusHistoryEntity getConnectionStatusHistory(String connectionId);
 
     /**
+     * Gets analytical statistics for the specified connection.
+     *
+     * @param connectionId connection
+     * @return statistics
+     */
+    ConnectionStatisticsEntity getConnectionStatistics(String connectionId);
+
+    /**
      * Creates a new Relationship target.
      *
      * @param revision revision
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 5ba1da6..f2e7608 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -216,6 +216,7 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
 import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
 import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
 import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
 import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
 import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
@@ -235,6 +236,7 @@ import org.apache.nifi.web.api.entity.BulletinEntity;
 import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
 import org.apache.nifi.web.api.entity.ComponentValidationResultEntity;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity;
 import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
 import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
 import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
@@ -3191,6 +3193,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
         return entityFactory.createStatusHistoryEntity(dto, permissions);
     }
 
+    @Override
+    public ConnectionStatisticsEntity getConnectionStatistics(final String connectionId) {
+        final Connection connection = connectionDAO.getConnection(connectionId);
+        final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
+        final ConnectionStatisticsDTO dto = dtoFactory.createConnectionStatisticsDto(controllerFacade.getConnectionStatistics(connectionId));
+        return entityFactory.createConnectionStatisticsEntity(dto, permissions);
+    }
+
     private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) {
         final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier()));
         final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 4b34f39..6f28d44 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -75,6 +75,7 @@ import org.apache.nifi.web.api.entity.BulletinBoardEntity;
 import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
 import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity;
 import org.apache.nifi.web.api.entity.ComponentHistoryEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity;
 import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
 import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
 import org.apache.nifi.web.api.entity.ControllerServiceEntity;
@@ -2074,6 +2075,79 @@ public class FlowResource extends ApplicationResource {
         return generateOkResponse(entity).build();
     }
 
+    /**
+     * Retrieves the specified connection statistics.
+     *
+     * @param id The id of the connection statistics to retrieve.
+     * @return A ConnectionStatisticsEntity.
+     * @throws InterruptedException if interrupted
+     */
+    @GET
+    @Consumes(MediaType.WILDCARD)
+    @Produces(MediaType.APPLICATION_JSON)
+    @Path("connections/{id}/statistics")
+    @ApiOperation(
+            value = "Gets statistics for a connection",
+            response = ConnectionStatisticsEntity.class,
+            authorizations = {
+                    @Authorization(value = "Read - /flow")
+            }
+    )
+    @ApiResponses(
+            value = {
+                    @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+                    @ApiResponse(code = 401, message = "Client could not be authenticated."),
+                    @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+                    @ApiResponse(code = 404, message = "The specified resource could not be found."),
+                    @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+            }
+    )
+    public Response getConnectionStatistics(
+            @ApiParam(
+                    value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+                    required = false
+            )
+            @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+            @ApiParam(
+                    value = "The id of the node where to get the statistics.",
+                    required = false
+            )
+            @QueryParam("clusterNodeId") String clusterNodeId,
+            @ApiParam(
+                    value = "The connection id.",
+                    required = true
+            )
+            @PathParam("id") String id) throws InterruptedException {
+
+        authorizeFlow();
+
+        // ensure a valid request
+        if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+            throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+        }
+
+        if (isReplicateRequest()) {
+            // determine where this request should be sent
+            if (clusterNodeId == null) {
+                final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
+                final ConnectionStatisticsEntity entity = (ConnectionStatisticsEntity) nodeResponse.getUpdatedEntity();
+
+                // ensure there is an updated entity (result of merging) and prune the response as necessary
+                if (entity != null && !nodewise) {
+                    entity.getConnectionStatistics().setNodeSnapshots(null);
+                }
+
+                return nodeResponse.getResponse();
+            } else {
+                return replicate(HttpMethod.GET, clusterNodeId);
+            }
+        }
+
+        // get the specified connection status
+        final ConnectionStatisticsEntity entity = serviceFacade.getConnectionStatistics(id);
+        return generateOkResponse(entity).build();
+    }
+
     // --------------
     // status history
     // --------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 97b8c5e..6903e44 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -105,6 +105,7 @@ import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
 import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
 import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
 import org.apache.nifi.diagnostics.GarbageCollection;
@@ -198,6 +199,8 @@ import org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO.LineageR
 import org.apache.nifi.web.api.dto.provenance.lineage.LineageResultsDTO;
 import org.apache.nifi.web.api.dto.provenance.lineage.ProvenanceLinkDTO;
 import org.apache.nifi.web.api.dto.provenance.lineage.ProvenanceNodeDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.PortStatusDTO;
@@ -1186,6 +1189,31 @@ public final class DtoFactory {
         return connectionStatusDto;
     }
 
+    public ConnectionStatisticsDTO createConnectionStatisticsDto(final StatusAnalytics connectionStatistics) {
+        final ConnectionStatisticsDTO connectionStatisticsDTO = new ConnectionStatisticsDTO();
+        connectionStatisticsDTO.setGroupId(connectionStatistics.getGroupId());
+        connectionStatisticsDTO.setId(connectionStatistics.getId());
+        connectionStatisticsDTO.setName(connectionStatistics.getName());
+        connectionStatisticsDTO.setSourceId(connectionStatistics.getSourceId());
+        connectionStatisticsDTO.setSourceName(connectionStatistics.getSourceName());
+        connectionStatisticsDTO.setDestinationId(connectionStatistics.getDestinationId());
+        connectionStatisticsDTO.setDestinationName(connectionStatistics.getDestinationName());
+        connectionStatisticsDTO.setStatsLastRefreshed(new Date());
+
+        final ConnectionStatisticsSnapshotDTO snapshot = new ConnectionStatisticsSnapshotDTO();
+        connectionStatisticsDTO.setAggregateSnapshot(snapshot);
+
+        snapshot.setId(connectionStatistics.getId());
+        snapshot.setGroupId(connectionStatistics.getGroupId());
+        snapshot.setName(connectionStatistics.getName());
+        snapshot.setSourceName(connectionStatistics.getSourceName());
+        snapshot.setDestinationName(connectionStatistics.getDestinationName());
+
+        snapshot.setPredictedMillisUntilBackpressure(connectionStatistics.getMinTimeToBackpressureMillis());
+
+        return connectionStatisticsDTO;
+    }
+
     public ProcessorStatusDTO createProcessorStatusDto(final ProcessorStatus procStatus) {
         final ProcessorStatusDTO dto = new ProcessorStatusDTO();
         dto.setId(procStatus.getId());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
index 915ad2c..1db0adc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
@@ -20,6 +20,8 @@ import org.apache.nifi.web.api.dto.action.ActionDTO;
 import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
 import org.apache.nifi.web.api.dto.flow.FlowBreadcrumbDTO;
 import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
 import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
 import org.apache.nifi.web.api.dto.status.ControllerServiceStatusDTO;
@@ -43,6 +45,8 @@ import org.apache.nifi.web.api.entity.BulletinEntity;
 import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
 import org.apache.nifi.web.api.entity.ComponentValidationResultEntity;
 import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatisticsSnapshotEntity;
 import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
 import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity;
 import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
@@ -137,6 +141,21 @@ public final class EntityFactory {
         return entity;
     }
 
+    public ConnectionStatisticsEntity createConnectionStatisticsEntity(final ConnectionStatisticsDTO statistics, final PermissionsDTO permissions) {
+        final ConnectionStatisticsEntity entity = new ConnectionStatisticsEntity();
+        entity.setCanRead(permissions.getCanRead());
+        entity.setConnectionStatistics(statistics); // always set the statistics, as it's always allowed... just need to provide permission context for merging responses
+        return entity;
+    }
+
+    public ConnectionStatisticsSnapshotEntity createConnectionStatisticsSnapshotEntity(final ConnectionStatisticsSnapshotDTO statistics, final PermissionsDTO permissions) {
+        final ConnectionStatisticsSnapshotEntity entity = new ConnectionStatisticsSnapshotEntity();
+        entity.setId(statistics.getId());
+        entity.setCanRead(permissions.getCanRead());
+        entity.setConnectionStatisticsSnapshot(statistics); // always set the statistics, as it's always allowed... just need to provide permission context for merging responses
+        return entity;
+    }
+
     public ProcessGroupStatusEntity createProcessGroupStatusEntity(final ProcessGroupStatusDTO status, final PermissionsDTO permissions) {
         final ProcessGroupStatusEntity entity = new ProcessGroupStatusEntity();
         entity.setCanRead(permissions.getCanRead());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index e560516..c1b6754 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -56,6 +56,7 @@ import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.diagnostics.SystemDiagnostics;
 import org.apache.nifi.flowfile.FlowFilePrioritizer;
@@ -681,6 +682,37 @@ public class ControllerFacade implements Authorizable {
     }
 
     /**
+     * Gets analytical statistics for the specified connection.
+     *
+     * @param connectionId connection id
+     * @return the statistics for the specified connection
+     */
+    public StatusAnalytics getConnectionStatistics(final String connectionId) {
+        final ProcessGroup root = getRootGroup();
+        final Connection connection = root.findConnection(connectionId);
+
+        // ensure the connection was found
+        if (connection == null) {
+            throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId));
+        }
+
+        // calculate the process group status
+        final String groupId = connection.getProcessGroup().getIdentifier();
+        final ProcessGroupStatus processGroupStatus = flowController.getEventAccess().getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
+        if (processGroupStatus == null) {
+            throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
+        }
+
+        // TODO get from flow controller
+        final StatusAnalytics status;
+        if (status == null) {
+            throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId));
+        }
+
+        return status;
+    }
+
+    /**
      * Gets the status for the specified input port.
      *
      * @param portId input port id