You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ai...@apache.org on 2019/08/27 16:42:55 UTC

[nifi] 01/23: NIFI-6510 Implement initial analytic engine

This is an automated email from the ASF dual-hosted git repository.

aichrist pushed a commit to branch analytics-framework
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 4fbdb487334381c36af84a14881bf837409e0f99
Author: Andrew I. Christianson <an...@andyic.org>
AuthorDate: Tue Jul 9 14:15:30 2019 -0400

    NIFI-6510 Implement initial analytic engine
---
 .../org/apache/nifi/controller/FlowController.java | 14 +++++
 .../status/analytics/StatusAnalyticEngine.java     | 60 ++++++++++++++++++++++
 2 files changed, 74 insertions(+)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 331e73e..4c0288f 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -117,6 +117,7 @@ import org.apache.nifi.controller.service.StandardConfigurationContext;
 import org.apache.nifi.controller.service.StandardControllerServiceProvider;
 import org.apache.nifi.controller.state.manager.StandardStateManagerProvider;
 import org.apache.nifi.controller.state.server.ZooKeeperStateServer;
+import org.apache.nifi.controller.status.analytics.StatusAnalyticEngine;
 import org.apache.nifi.controller.status.history.ComponentStatusRepository;
 import org.apache.nifi.controller.status.history.GarbageCollectionHistory;
 import org.apache.nifi.controller.status.history.GarbageCollectionStatus;
@@ -601,6 +602,19 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             }
         }, snapshotMillis, snapshotMillis, TimeUnit.MILLISECONDS);
 
+        StatusAnalyticEngine analyticsEngine = new StatusAnalyticEngine(this, componentStatusRepository);
+
+        timerDrivenEngineRef.get().scheduleWithFixedDelay(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    analyticsEngine.getMinTimeToBackpressure();
+                } catch (final Exception e) {
+                    LOG.error("Failed to capture component stats for Stats History", e);
+                }
+            }
+        }, 1000, 1000, TimeUnit.MILLISECONDS); //FIXME use a real/configured interval
+
         this.connectionStatus = new NodeConnectionStatus(nodeId, DisconnectionCode.NOT_YET_CONNECTED);
         heartbeatBeanRef.set(new HeartbeatBean(rootGroup, false));
 
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
new file mode 100644
index 0000000..8b69ebf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/status/analytics/StatusAnalyticEngine.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.controller.status.analytics;
+
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.FlowController;
+import org.apache.nifi.controller.status.history.ComponentStatusRepository;
+import org.apache.nifi.controller.status.history.StatusHistoryUtil;
+import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.web.api.dto.status.StatusHistoryDTO;
+import org.apache.nifi.web.api.dto.status.StatusSnapshotDTO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StatusAnalyticEngine {
+    private ComponentStatusRepository statusRepository;
+    private FlowController controller;
+
+    private static final Logger LOG = LoggerFactory.getLogger(StatusAnalyticEngine.class);
+
+    public StatusAnalyticEngine(FlowController controller, ComponentStatusRepository statusRepository) {
+        this.controller = controller;
+        this.statusRepository = statusRepository;
+    }
+
+    public long getMinTimeToBackpressure() {
+        ProcessGroup rootGroup = controller.getFlowManager().getRootGroup();
+        List<Connection> allConnections = rootGroup.findAllConnections();
+
+        for (Connection conn : allConnections) {
+            LOG.info("Getting connection history for: " + conn.getIdentifier());
+            StatusHistoryDTO connHistory = StatusHistoryUtil.createStatusHistoryDTO(
+                    statusRepository.getConnectionStatusHistory(conn.getIdentifier(), null, null, Integer.MAX_VALUE));
+            for (StatusSnapshotDTO snap : connHistory.getAggregateSnapshots()) {
+                for (Entry<String, Long> snapEntry : snap.getStatusMetrics().entrySet()) {
+                    LOG.info("Snap " + snapEntry.getKey() + ": " + snapEntry.getValue());
+                }
+            }
+        }
+
+        return 0;
+    }
+}