You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/27 16:42:57 UTC
[nifi] 03/23: NIFI-6510 Initial analytics REST endpoint and
supporting objects
This is an automated email from the ASF dual-hosted git repository.
aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit e1f5296d049fcab1b94e1f6cc08f0fe57322f9f1
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Jul 15 11:56:55 2019 -0400
NIFI-6510 Initial analytics REST endpoint and supporting objects
---
.../status/analytics/StatusAnalytics.java | 21 +++
.../api/dto/status/ConnectionStatisticsDTO.java | 161 +++++++++++++++++++++
.../status/ConnectionStatisticsSnapshotDTO.java | 149 +++++++++++++++++++
.../NodeConnectionStatisticsSnapshotDTO.java | 78 ++++++++++
.../web/api/entity/ConnectionStatisticsEntity.java | 55 +++++++
.../entity/ConnectionStatisticsSnapshotEntity.java | 76 ++++++++++
.../org/apache/nifi/controller/FlowController.java | 3 +-
.../status/analytics/StatusAnalyticEngine.java | 3 +-
.../org/apache/nifi/web/NiFiServiceFacade.java | 9 ++
.../apache/nifi/web/StandardNiFiServiceFacade.java | 10 ++
.../java/org/apache/nifi/web/api/FlowResource.java | 74 ++++++++++
.../org/apache/nifi/web/api/dto/DtoFactory.java | 28 ++++
.../org/apache/nifi/web/api/dto/EntityFactory.java | 19 +++
.../nifi/web/controller/ControllerFacade.java | 32 ++++
14 files changed, 716 insertions(+), 2 deletions(-)
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
new file mode 100644
index 0000000..d6ad3bc
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalytics.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+public interface StatusAnalytics {
+ long getMinTimeToBackpressureMillis();
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsDTO.java
new file mode 100644
index 0000000..79ae947
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsDTO.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.dto.status;
+
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.web.api.dto.util.TimeAdapter;
+
+import javax.xml.bind.annotation.XmlType;
+import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+@XmlType(name = "connectionStatistics")
+public class ConnectionStatisticsDTO implements Cloneable {
+ private String id;
+ private String groupId;
+ private String name;
+ private Date statsLastRefreshed;
+
+ private String sourceId;
+ private String sourceName;
+ private String destinationId;
+
+ private String destinationName;
+ private ConnectionStatisticsSnapshotDTO aggregateSnapshot;
+
+ private List<NodeConnectionStatisticsSnapshotDTO> nodeSnapshots;
+
+ @ApiModelProperty("The ID of the connection")
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @ApiModelProperty("The ID of the Process Group that the connection belongs to")
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(String groupId) {
+ this.groupId = groupId;
+ }
+
+ @ApiModelProperty("The name of the connection")
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ @ApiModelProperty("The ID of the source component")
+ public String getSourceId() {
+ return sourceId;
+ }
+
+ public void setSourceId(String sourceId) {
+ this.sourceId = sourceId;
+ }
+
+ @ApiModelProperty("The name of the source component")
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public void setSourceName(String sourceName) {
+ this.sourceName = sourceName;
+ }
+
+ @ApiModelProperty("The ID of the destination component")
+ public String getDestinationId() {
+ return destinationId;
+ }
+
+ public void setDestinationId(String destinationId) {
+ this.destinationId = destinationId;
+ }
+
+ @ApiModelProperty("The name of the destination component")
+ public String getDestinationName() {
+ return destinationName;
+ }
+
+ public void setDestinationName(String destinationName) {
+ this.destinationName = destinationName;
+ }
+
+ @ApiModelProperty("The status snapshot that represents the aggregate stats of the cluster")
+ public ConnectionStatisticsSnapshotDTO getAggregateSnapshot() {
+ return aggregateSnapshot;
+ }
+
+ public void setAggregateSnapshot(ConnectionStatisticsSnapshotDTO aggregateSnapshot) {
+ this.aggregateSnapshot = aggregateSnapshot;
+ }
+
+ @ApiModelProperty("A list of status snapshots for each node")
+ public List<NodeConnectionStatisticsSnapshotDTO> getNodeSnapshots() {
+ return nodeSnapshots;
+ }
+
+ public void setNodeSnapshots(List<NodeConnectionStatisticsSnapshotDTO> nodeSnapshots) {
+ this.nodeSnapshots = nodeSnapshots;
+ }
+
+ @XmlJavaTypeAdapter(TimeAdapter.class)
+ @ApiModelProperty(
+ value = "The timestamp of when the stats were last refreshed",
+ dataType = "string"
+ )
+ public Date getStatsLastRefreshed() {
+ return statsLastRefreshed;
+ }
+
+ public void setStatsLastRefreshed(Date statsLastRefreshed) {
+ this.statsLastRefreshed = statsLastRefreshed;
+ }
+
+ @Override
+ public ConnectionStatisticsDTO clone() {
+ final ConnectionStatisticsDTO other = new ConnectionStatisticsDTO();
+ other.setDestinationId(getDestinationId());
+ other.setDestinationName(getDestinationName());
+ other.setGroupId(getGroupId());
+ other.setId(getId());
+ other.setName(getName());
+ other.setSourceId(getSourceId());
+ other.setSourceName(getSourceName());
+ other.setAggregateSnapshot(getAggregateSnapshot().clone());
+
+
+ final List<NodeConnectionStatisticsSnapshotDTO> nodeStatuses = getNodeSnapshots();
+ final List<NodeConnectionStatisticsSnapshotDTO> nodeStatusClones = new ArrayList<>(nodeStatuses.size());
+ for (final NodeConnectionStatisticsSnapshotDTO nodeStatus : nodeStatuses) {
+ nodeStatusClones.add(nodeStatus.clone());
+ }
+ other.setNodeSnapshots(nodeStatusClones);
+
+ return other;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
new file mode 100644
index 0000000..e914f74
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/ConnectionStatisticsSnapshotDTO.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.dto.status;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlType;
+
+/**
+ * DTO for serializing the statistics of a connection.
+ */
+@XmlType(name = "connectionStatisticsSnapshot")
+public class ConnectionStatisticsSnapshotDTO implements Cloneable {
+
+ private String id;
+ private String groupId;
+ private String name;
+
+ private String sourceId;
+ private String sourceName;
+ private String destinationId;
+ private String destinationName;
+
+ private Long predictedMillisUntilBackpressure = 0L;
+
+ /* getters / setters */
+ /**
+ * @return The connection id
+ */
+ @ApiModelProperty("The id of the connection.")
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ /**
+ * @return the ID of the Process Group to which this connection belongs.
+ */
+ @ApiModelProperty("The id of the process group the connection belongs to.")
+ public String getGroupId() {
+ return groupId;
+ }
+
+ public void setGroupId(final String groupId) {
+ this.groupId = groupId;
+ }
+
+ /**
+ * @return name of this connection
+ */
+ @ApiModelProperty("The name of the connection.")
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * @return id of the source of this connection
+ */
+ @ApiModelProperty("The id of the source of the connection.")
+ public String getSourceId() {
+ return sourceId;
+ }
+
+ public void setSourceId(String sourceId) {
+ this.sourceId = sourceId;
+ }
+
+ /**
+ * @return name of the source of this connection
+ */
+ @ApiModelProperty("The name of the source of the connection.")
+ public String getSourceName() {
+ return sourceName;
+ }
+
+ public void setSourceName(String sourceName) {
+ this.sourceName = sourceName;
+ }
+
+ /**
+ * @return id of the destination of this connection
+ */
+ @ApiModelProperty("The id of the destination of the connection.")
+ public String getDestinationId() {
+ return destinationId;
+ }
+
+ public void setDestinationId(String destinationId) {
+ this.destinationId = destinationId;
+ }
+
+ /**
+ * @return name of the destination of this connection
+ */
+ @ApiModelProperty("The name of the destination of the connection.")
+ public String getDestinationName() {
+ return destinationName;
+ }
+
+ public void setDestinationName(String destinationName) {
+ this.destinationName = destinationName;
+ }
+
+ @ApiModelProperty("The predicted number of milliseconds before the connection will have backpressure applied.")
+ public Long getPredictedMillisUntilBackpressure() {
+ return predictedMillisUntilBackpressure;
+ }
+
+ public void setPredictedMillisUntilBackpressure(Long predictedMillisUntilBackpressure) {
+ this.predictedMillisUntilBackpressure = predictedMillisUntilBackpressure;
+ }
+
+ @Override
+ public ConnectionStatisticsSnapshotDTO clone() {
+ final ConnectionStatisticsSnapshotDTO other = new ConnectionStatisticsSnapshotDTO();
+ other.setDestinationId(getDestinationId());
+ other.setDestinationName(getDestinationName());
+ other.setGroupId(getGroupId());
+ other.setId(getId());
+ other.setName(getName());
+ other.setSourceId(getSourceId());
+ other.setSourceName(getSourceName());
+
+ other.setPredictedMillisUntilBackpressure(getPredictedMillisUntilBackpressure());
+
+ return other;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeConnectionStatisticsSnapshotDTO.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeConnectionStatisticsSnapshotDTO.java
new file mode 100644
index 0000000..76f94ec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/dto/status/NodeConnectionStatisticsSnapshotDTO.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.web.api.dto.status;
+
+import io.swagger.annotations.ApiModelProperty;
+
+import javax.xml.bind.annotation.XmlType;
+
+@XmlType(name = "nodeConnectionStatisticsSnapshot")
+public class NodeConnectionStatisticsSnapshotDTO implements Cloneable {
+ private String nodeId;
+ private String address;
+ private Integer apiPort;
+
+ private ConnectionStatisticsSnapshotDTO statisticsSnapshot;
+
+ @ApiModelProperty("The unique ID that identifies the node")
+ public String getNodeId() {
+ return nodeId;
+ }
+
+ public void setNodeId(String nodeId) {
+ this.nodeId = nodeId;
+ }
+
+ @ApiModelProperty("The API address of the node")
+ public String getAddress() {
+ return address;
+ }
+
+ public void setAddress(String address) {
+ this.address = address;
+ }
+
+ @ApiModelProperty("The API port used to communicate with the node")
+ public Integer getApiPort() {
+ return apiPort;
+ }
+
+ public void setApiPort(Integer apiPort) {
+ this.apiPort = apiPort;
+ }
+
+ @ApiModelProperty("The connection status snapshot from the node.")
+ public ConnectionStatisticsSnapshotDTO getStatisticsSnapshot() {
+ return statisticsSnapshot;
+ }
+
+ public void setStatisticsSnapshot(ConnectionStatisticsSnapshotDTO statisticsSnapshot) {
+ this.statisticsSnapshot = statisticsSnapshot;
+ }
+
+ @Override
+ public NodeConnectionStatisticsSnapshotDTO clone() {
+ final NodeConnectionStatisticsSnapshotDTO other = new NodeConnectionStatisticsSnapshotDTO();
+ other.setNodeId(getNodeId());
+ other.setAddress(getAddress());
+ other.setApiPort(getApiPort());
+ other.setStatisticsSnapshot(getStatisticsSnapshot().clone());
+ return other;
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsEntity.java
new file mode 100644
index 0000000..781cff6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsEntity.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import org.apache.nifi.web.api.dto.ReadablePermission;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO;
+
+import javax.xml.bind.annotation.XmlRootElement;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API. This particular entity holds a reference to a ConnectionStatisticsDTO.
+ */
+@XmlRootElement(name = "connectionStatisticsEntity")
+public class ConnectionStatisticsEntity extends Entity implements ReadablePermission {
+
+ private ConnectionStatisticsDTO connectionStatistics;
+ private Boolean canRead;
+
+ /**
+ * The ConnectionStatisticsDTO that is being serialized.
+ *
+ * @return The ConnectionStatisticsDTO object
+ */
+ public ConnectionStatisticsDTO getConnectionStatistics() {
+ return connectionStatistics;
+ }
+
+ public void setConnectionStatistics(ConnectionStatisticsDTO connectionStatistics) {
+ this.connectionStatistics = connectionStatistics;
+ }
+
+ @Override
+ public Boolean getCanRead() {
+ return canRead;
+ }
+
+ @Override
+ public void setCanRead(Boolean canRead) {
+ this.canRead = canRead;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java
new file mode 100644
index 0000000..da7e5ca
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-client-dto/src/main/java/org/apache/nifi/web/api/entity/ConnectionStatisticsSnapshotEntity.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.web.api.entity;
+
+import io.swagger.annotations.ApiModelProperty;
+import org.apache.nifi.web.api.dto.ReadablePermission;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
+
+/**
+ * A serialized representation of this class can be placed in the entity body of a request or response to or from the API.
+ * This particular entity holds a reference to a ConnectionStatisticsSnapshotDTO.
+ */
+public class ConnectionStatisticsSnapshotEntity extends Entity implements ReadablePermission, Cloneable {
+ private String id;
+ private ConnectionStatisticsSnapshotDTO connectionStatisticsSnapshot;
+ private Boolean canRead;
+
+ /**
+ * @return The connection id
+ */
+ @ApiModelProperty("The id of the connection.")
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ /**
+ * The ConnectionStatisticsSnapshotDTO that is being serialized.
+ *
+ * @return The ConnectionStatisticsSnapshotDTO object
+ */
+ public ConnectionStatisticsSnapshotDTO getConnectionStatisticsSnapshot() {
+ return connectionStatisticsSnapshot;
+ }
+
+ public void setConnectionStatisticsSnapshot(ConnectionStatisticsSnapshotDTO connectionStatusSnapshot) {
+ this.connectionStatisticsSnapshot = connectionStatusSnapshot;
+ }
+
+ @Override
+ public Boolean getCanRead() {
+ return canRead;
+ }
+
+ @Override
+ public void setCanRead(Boolean canRead) {
+ this.canRead = canRead;
+ }
+
+ @Override
+ public ConnectionStatisticsSnapshotEntity clone() {
+ final ConnectionStatisticsSnapshotEntity other = new ConnectionStatisticsSnapshotEntity();
+ other.setCanRead(this.getCanRead());
+ other.setConnectionStatisticsSnapshot(this.getConnectionStatisticsSnapshot().clone());
+
+ return other;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index f7ed734..0c422b4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -118,6 +118,7 @@ import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
import org.apache.nifi.controller.status.analytics.StatusAnalyticEngine;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
@@ -602,7 +603,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
- StatusAnalyticEngine analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository);
+ StatusAnalytics analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository);
timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
@Override
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
index 0602a93..9231707 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -31,7 +31,7 @@ import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StatusAnalyticEngine {
+public class StatusAnalyticEngine implements StatusAnalytics {
private ComponentStatusRepository statusRepository;
private FlowController controller;
@@ -42,6 +42,7 @@ public class StatusAnalyticEngine {
this.statusRepository = statusRepository;
}
+ @Override
public long getMinTimeToBackpressureMillis() {
ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
List<Connection> allConnections = rootGroup.findAllConnections();
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
index f62bec2..9ed8808 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/NiFiServiceFacade.java
@@ -87,6 +87,7 @@ import org.apache.nifi.web.api.entity.BucketEntity;
import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ComponentValidationResultEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
@@ -662,6 +663,14 @@ public interface NiFiServiceFacade {
StatusHistoryEntity getConnectionStatusHistory(String connectionId);
/**
+ * Gets analytical statistics for the specified connection.
+ *
+ * @param connectionId connection
+ * @return statistics
+ */
+ ConnectionStatisticsEntity getConnectionStatistics(String connectionId);
+
+ /**
* Creates a new Relationship target.
*
* @param revision revision
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
index 5ba1da6..f2e7608 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/StandardNiFiServiceFacade.java
@@ -216,6 +216,7 @@ import org.apache.nifi.web.api.dto.provenance.ProvenanceEventDTO;
import org.apache.nifi.web.api.dto.provenance.ProvenanceOptionsDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.LineageDTO;
import org.apache.nifi.web.api.dto.search.SearchResultsDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ControllerStatusDTO;
import org.apache.nifi.web.api.dto.status.NodeProcessGroupStatusSnapshotDTO;
@@ -235,6 +236,7 @@ import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
import org.apache.nifi.web.api.entity.ComponentValidationResultEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
@@ -3191,6 +3193,14 @@ public class StandardNiFiServiceFacade implements NiFiServiceFacade {
return entityFactory.createStatusHistoryEntity(dto, permissions);
}
+ @Override
+ public ConnectionStatisticsEntity getConnectionStatistics(final String connectionId) {
+ final Connection connection = connectionDAO.getConnection(connectionId);
+ final PermissionsDTO permissions = dtoFactory.createPermissionsDto(connection);
+ final ConnectionStatisticsDTO dto = dtoFactory.createConnectionStatisticsDto(controllerFacade.getConnectionStatistics(connectionId));
+ return entityFactory.createConnectionStatisticsEntity(dto, permissions);
+ }
+
private ProcessorEntity createProcessorEntity(final ProcessorNode processor, final NiFiUser user) {
final RevisionDTO revision = dtoFactory.createRevisionDTO(revisionManager.getRevision(processor.getIdentifier()));
final PermissionsDTO permissions = dtoFactory.createPermissionsDto(processor, user);
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
index 4b34f39..6f28d44 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/FlowResource.java
@@ -75,6 +75,7 @@ import org.apache.nifi.web.api.entity.BulletinBoardEntity;
import org.apache.nifi.web.api.entity.ClusteSummaryEntity;
import org.apache.nifi.web.api.entity.ClusterSearchResultsEntity;
import org.apache.nifi.web.api.entity.ComponentHistoryEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ControllerBulletinsEntity;
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
@@ -2074,6 +2075,79 @@ public class FlowResource extends ApplicationResource {
return generateOkResponse(entity).build();
}
+ /**
+ * Retrieves the specified connection statistics.
+ *
+ * @param id The id of the connection statistics to retrieve.
+ * @return A ConnectionStatisticsEntity.
+ * @throws InterruptedException if interrupted
+ */
+ @GET
+ @Consumes(MediaType.WILDCARD)
+ @Produces(MediaType.APPLICATION_JSON)
+ @Path("connections/{id}/statistics")
+ @ApiOperation(
+ value = "Gets statistics for a connection",
+ response = ConnectionStatisticsEntity.class,
+ authorizations = {
+ @Authorization(value = "Read - /flow")
+ }
+ )
+ @ApiResponses(
+ value = {
+ @ApiResponse(code = 400, message = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
+ @ApiResponse(code = 401, message = "Client could not be authenticated."),
+ @ApiResponse(code = 403, message = "Client is not authorized to make this request."),
+ @ApiResponse(code = 404, message = "The specified resource could not be found."),
+ @ApiResponse(code = 409, message = "The request was valid but NiFi was not in the appropriate state to process it. Retrying the same request later may be successful.")
+ }
+ )
+ public Response getConnectionStatistics(
+ @ApiParam(
+ value = "Whether or not to include the breakdown per node. Optional, defaults to false",
+ required = false
+ )
+ @QueryParam("nodewise") @DefaultValue(NODEWISE) Boolean nodewise,
+ @ApiParam(
+ value = "The id of the node where to get the statistics.",
+ required = false
+ )
+ @QueryParam("clusterNodeId") String clusterNodeId,
+ @ApiParam(
+ value = "The connection id.",
+ required = true
+ )
+ @PathParam("id") String id) throws InterruptedException {
+
+ authorizeFlow();
+
+ // ensure a valid request
+ if (Boolean.TRUE.equals(nodewise) && clusterNodeId != null) {
+ throw new IllegalArgumentException("Nodewise requests cannot be directed at a specific node.");
+ }
+
+ if (isReplicateRequest()) {
+ // determine where this request should be sent
+ if (clusterNodeId == null) {
+ final NodeResponse nodeResponse = replicateNodeResponse(HttpMethod.GET);
+ final ConnectionStatisticsEntity entity = (ConnectionStatisticsEntity) nodeResponse.getUpdatedEntity();
+
+ // ensure there is an updated entity (result of merging) and prune the response as necessary
+ if (entity != null && !nodewise) {
+ entity.getConnectionStatistics().setNodeSnapshots(null);
+ }
+
+ return nodeResponse.getResponse();
+ } else {
+ return replicate(HttpMethod.GET, clusterNodeId);
+ }
+ }
+
+ // get the specified connection status
+ final ConnectionStatisticsEntity entity = serviceFacade.getConnectionStatistics(id);
+ return generateOkResponse(entity).build();
+ }
+
// --------------
// status history
// --------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
index 97b8c5e..6903e44 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/DtoFactory.java
@@ -105,6 +105,7 @@ import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
import org.apache.nifi.diagnostics.GarbageCollection;
@@ -198,6 +199,8 @@ import org.apache.nifi.web.api.dto.provenance.lineage.LineageRequestDTO.LineageR
import org.apache.nifi.web.api.dto.provenance.lineage.LineageResultsDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.ProvenanceLinkDTO;
import org.apache.nifi.web.api.dto.provenance.lineage.ProvenanceNodeDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.PortStatusDTO;
@@ -1186,6 +1189,31 @@ public final class DtoFactory {
return connectionStatusDto;
}
+ public ConnectionStatisticsDTO createConnectionStatisticsDto(final StatusAnalytics connectionStatistics) {
+ final ConnectionStatisticsDTO connectionStatisticsDTO = new ConnectionStatisticsDTO();
+ connectionStatisticsDTO.setGroupId(connectionStatistics.getGroupId());
+ connectionStatisticsDTO.setId(connectionStatistics.getId());
+ connectionStatisticsDTO.setName(connectionStatistics.getName());
+ connectionStatisticsDTO.setSourceId(connectionStatistics.getSourceId());
+ connectionStatisticsDTO.setSourceName(connectionStatistics.getSourceName());
+ connectionStatisticsDTO.setDestinationId(connectionStatistics.getDestinationId());
+ connectionStatisticsDTO.setDestinationName(connectionStatistics.getDestinationName());
+ connectionStatisticsDTO.setStatsLastRefreshed(new Date());
+
+ final ConnectionStatisticsSnapshotDTO snapshot = new ConnectionStatisticsSnapshotDTO();
+ connectionStatisticsDTO.setAggregateSnapshot(snapshot);
+
+ snapshot.setId(connectionStatistics.getId());
+ snapshot.setGroupId(connectionStatistics.getGroupId());
+ snapshot.setName(connectionStatistics.getName());
+ snapshot.setSourceName(connectionStatistics.getSourceName());
+ snapshot.setDestinationName(connectionStatistics.getDestinationName());
+
+ snapshot.setPredictedMillisUntilBackpressure(connectionStatistics.getMinTimeToBackpressureMillis());
+
+ return connectionStatisticsDTO;
+ }
+
public ProcessorStatusDTO createProcessorStatusDto(final ProcessorStatus procStatus) {
final ProcessorStatusDTO dto = new ProcessorStatusDTO();
dto.setId(procStatus.getId());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
index 915ad2c..1db0adc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/api/dto/EntityFactory.java
@@ -20,6 +20,8 @@ import org.apache.nifi.web.api.dto.action.ActionDTO;
import org.apache.nifi.web.api.dto.diagnostics.ProcessorDiagnosticsDTO;
import org.apache.nifi.web.api.dto.flow.FlowBreadcrumbDTO;
import org.apache.nifi.web.api.dto.flow.ProcessGroupFlowDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsDTO;
+import org.apache.nifi.web.api.dto.status.ConnectionStatisticsSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusDTO;
import org.apache.nifi.web.api.dto.status.ConnectionStatusSnapshotDTO;
import org.apache.nifi.web.api.dto.status.ControllerServiceStatusDTO;
@@ -43,6 +45,8 @@ import org.apache.nifi.web.api.entity.BulletinEntity;
import org.apache.nifi.web.api.entity.ComponentReferenceEntity;
import org.apache.nifi.web.api.entity.ComponentValidationResultEntity;
import org.apache.nifi.web.api.entity.ConnectionEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatisticsEntity;
+import org.apache.nifi.web.api.entity.ConnectionStatisticsSnapshotEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusEntity;
import org.apache.nifi.web.api.entity.ConnectionStatusSnapshotEntity;
import org.apache.nifi.web.api.entity.ControllerConfigurationEntity;
@@ -137,6 +141,21 @@ public final class EntityFactory {
return entity;
}
+ public ConnectionStatisticsEntity createConnectionStatisticsEntity(final ConnectionStatisticsDTO statistics, final PermissionsDTO permissions) {
+ final ConnectionStatisticsEntity entity = new ConnectionStatisticsEntity();
+ entity.setCanRead(permissions.getCanRead());
+ entity.setConnectionStatistics(statistics); // always set the statistics, as it's always allowed... just need to provide permission context for merging responses
+ return entity;
+ }
+
+ public ConnectionStatisticsSnapshotEntity createConnectionStatisticsSnapshotEntity(final ConnectionStatisticsSnapshotDTO statistics, final PermissionsDTO permissions) {
+ final ConnectionStatisticsSnapshotEntity entity = new ConnectionStatisticsSnapshotEntity();
+ entity.setId(statistics.getId());
+ entity.setCanRead(permissions.getCanRead());
+ entity.setConnectionStatisticsSnapshot(statistics); // always set the statistics, as it's always allowed... just need to provide permission context for merging responses
+ return entity;
+ }
+
public ProcessGroupStatusEntity createProcessGroupStatusEntity(final ProcessGroupStatusDTO status, final PermissionsDTO permissions) {
final ProcessGroupStatusEntity entity = new ProcessGroupStatusEntity();
entity.setCanRead(permissions.getCanRead());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
index e560516..c1b6754 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-api/src/main/java/org/apache/nifi/web/controller/ControllerFacade.java
@@ -56,6 +56,7 @@ import org.apache.nifi.controller.status.PortStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
+import org.apache.nifi.controller.status.analytics.StatusAnalytics;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.diagnostics.SystemDiagnostics;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
@@ -681,6 +682,37 @@ public class ControllerFacade implements Authorizable {
}
/**
+ * Gets analytical statistics for the specified connection.
+ *
+ * @param connectionId connection id
+ * @return the statistics for the specified connection
+ */
+ public StatusAnalytics getConnectionStatistics(final String connectionId) {
+ final ProcessGroup root = getRootGroup();
+ final Connection connection = root.findConnection(connectionId);
+
+ // ensure the connection was found
+ if (connection == null) {
+ throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId));
+ }
+
+ // calculate the process group status
+ final String groupId = connection.getProcessGroup().getIdentifier();
+ final ProcessGroupStatus processGroupStatus = flowController.getEventAccess().getGroupStatus(groupId, NiFiUserUtils.getNiFiUser(), 1);
+ if (processGroupStatus == null) {
+ throw new ResourceNotFoundException(String.format("Unable to locate group with id '%s'.", groupId));
+ }
+
+ // TODO get from flow controller
+ final StatusAnalytics status;
+ if (status == null) {
+ throw new ResourceNotFoundException(String.format("Unable to locate connection with id '%s'.", connectionId));
+ }
+
+ return status;
+ }
+
+ /**
* Gets the status for the specified input port.
*
* @param portId input port id