You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/19 13:53:37 UTC
[nifi] 01/21: NIFI-6510 Implement initial analytic engine
This is an automated email from the ASF dual-hosted git repository.
aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit c4cc6c9a8a3d92777435d8559e5eb01e2d395daa
Author: Andrew I. Christianson <an...@andyic.org>
AuthorDate: Tue Jul 9 14:15:30 2019 -0400
NIFI-6510 Implement initial analytic engine
---
.../org/apache/nifi/controller/FlowController.java | 14 +++++
.../status/analytics/StatusAnalyticEngine.java | 60 ++++++++++++++++++++++
2 files changed, 74 insertions(+)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 331e73e..4c0288f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -117,6 +117,7 @@ import org.apache.nifi.controller.service.StandardConfigurationContext;
import org.apache.nifi.controller.service.StandardControllerServiceProvider;
import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
+import org.apache.nifi.controller.status.analytics.StatusAnalyticEngine;
import org.apache.nifi.controller.status.history.ComponentStatusRepository;
import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
@@ -601,6 +602,19 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
}
}, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
+ StatusAnalyticEngine analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository);
+
+ timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ analyticsEngine.getMinTimeToBackpressure();
+ } catch (final Exception e) {
+ LOG.error("Failed to capture component stats for Stats History", e);
+ }
+ }
+ }, 1000, 1000, TimeUnit.MILLISECONDS); //FIXME use a real/configured interval
+
this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
new file mode 100644
index 0000000..8b69ebf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryUtil;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StatusAnalyticEngine {
+ private ComponentStatusRepository statusRepository;
+ private FlowController controller;
+
+ private static final Logger LOG = LoggerFactory.getLogger(StatusAnalyticEngine.class);
+
+ public StatusAnalyticEngine(FlowController controller, ComponentStatusRepository statusRepository) {
+ this.controller = controller;
+ this.statusRepository = statusRepository;
+ }
+
+ public long getMinTimeToBackpressure() {
+ ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
+ List<Connection> allConnections = rootGroup.findAllConnections();
+
+ for (Connection conn : allConnections) {
+ LOG.info("Getting connection history for: " + conn.getIdentifier());
+ StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
+ statusRepository.getConnectionStatusHistory(conn.getIdentifier(), null, null, Integer.MAX_VALUE));
+ for (StatusSnapshotDTO snap : connHistory.getAggregateSnapshots()) {
+ for (Entry<String, Long> snapEntry : snap.getStatusMetrics().entrySet()) {
+ LOG.info("Snap " + snapEntry.getKey() + ": " + snapEntry.getValue());
+ }
+ }
+ }
+
+ return 0;
+ }
+}