You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2022/05/31 20:27:35 UTC

[nifi] branch main updated: NIFI-10072: Adding LoadBalanceStatus to ConnectionStatus (#6086)

This is an automated email from the ASF dual-hosted git repository.

bbende pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 314232ca6c NIFI-10072: Adding LoadBalanceStatus to ConnectionStatus (#6086)
314232ca6c is described below

commit 314232ca6c85f5280ee5319414c3339f1fb851b3
Author: Joe Gresock <jg...@gmail.com>
AuthorDate: Tue May 31 16:27:23 2022 -0400

    NIFI-10072: Adding LoadBalanceStatus to ConnectionStatus (#6086)
---
 .../nifi/controller/status/ConnectionStatus.java   | 12 ++++++++++
 .../nifi/controller/status/LoadBalanceStatus.java  | 26 ++++++++++++++++++++++
 .../nifi/controller/status/ProcessGroupStatus.java | 23 +++++++++++++++++++
 .../apache/nifi/reporting/AbstractEventAccess.java | 13 +++++++++++
 4 files changed, 74 insertions(+)

diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
index 754483e5ed..7033f7a49c 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ConnectionStatus.java
@@ -45,6 +45,7 @@ public class ConnectionStatus implements Cloneable {
     private long totalQueuedDuration;
     private long maxQueuedDuration;
     private FlowFileAvailability flowFileAvailability;
+    private LoadBalanceStatus loadBalanceStatus;
 
     public String getId() {
         return id;
@@ -62,6 +63,14 @@ public class ConnectionStatus implements Cloneable {
         this.groupId = groupId;
     }
 
+    public LoadBalanceStatus getLoadBalanceStatus() {
+        return loadBalanceStatus;
+    }
+
+    public void setLoadBalanceStatus(final LoadBalanceStatus loadBalanceStatus) {
+        this.loadBalanceStatus = loadBalanceStatus;
+    }
+
     public int getQueuedCount() {
         return queuedCount;
     }
@@ -228,6 +237,7 @@ public class ConnectionStatus implements Cloneable {
         final ConnectionStatus clonedObj = new ConnectionStatus();
         clonedObj.groupId = groupId;
         clonedObj.id = id;
+        clonedObj.loadBalanceStatus = loadBalanceStatus;
         clonedObj.inputBytes = inputBytes;
         clonedObj.inputCount = inputCount;
         clonedObj.name = name;
@@ -297,6 +307,8 @@ public class ConnectionStatus implements Cloneable {
         builder.append(totalQueuedDuration);
         builder.append(", maxActiveQueuedDuration=");
         builder.append(maxQueuedDuration);
+        builder.append(", loadBalanceStatus=");
+        builder.append(loadBalanceStatus);
         builder.append("]");
         return builder.toString();
     }
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/LoadBalanceStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/LoadBalanceStatus.java
new file mode 100644
index 0000000000..bd3bed4e75
--- /dev/null
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/LoadBalanceStatus.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.status;
+
+public enum LoadBalanceStatus {
+    LOAD_BALANCE_NOT_CONFIGURED,
+
+    LOAD_BALANCE_ACTIVE,
+
+    LOAD_BALANCE_INACTIVE;
+}
diff --git a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
index 3e4720ef6b..e0fda6dda2 100644
--- a/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
+++ b/nifi-api/src/main/java/org/apache/nifi/controller/status/ProcessGroupStatus.java
@@ -448,6 +448,7 @@ public class ProcessGroupStatus implements Cloneable {
             merged.setOutputCount(merged.getOutputCount() + statusToMerge.getOutputCount());
             merged.setOutputBytes(merged.getOutputBytes() + statusToMerge.getOutputBytes());
             merged.setFlowFileAvailability(mergeFlowFileAvailability(merged.getFlowFileAvailability(), statusToMerge.getFlowFileAvailability()));
+            merged.setLoadBalanceStatus(mergeLoadBalanceStatus(merged.getLoadBalanceStatus(), statusToMerge.getLoadBalanceStatus()));
         }
         target.setConnectionStatus(mergedConnectionMap.values());
 
@@ -611,4 +612,26 @@ public class ProcessGroupStatus implements Cloneable {
 
         return FlowFileAvailability.FLOWFILE_AVAILABLE;
     }
+
+    public static LoadBalanceStatus mergeLoadBalanceStatus(final LoadBalanceStatus statusA, final LoadBalanceStatus statusB) {
+        if (statusA == statusB) {
+            return statusA;
+        }
+        if (statusA == null) {
+            return statusB;
+        }
+        if (statusB == null) {
+            return statusA;
+        }
+
+        if (statusA == LoadBalanceStatus.LOAD_BALANCE_ACTIVE || statusB == LoadBalanceStatus.LOAD_BALANCE_ACTIVE) {
+            return LoadBalanceStatus.LOAD_BALANCE_ACTIVE;
+        }
+
+        if (statusA == LoadBalanceStatus.LOAD_BALANCE_INACTIVE || statusB == LoadBalanceStatus.LOAD_BALANCE_INACTIVE) {
+            return LoadBalanceStatus.LOAD_BALANCE_INACTIVE;
+        }
+
+        return LoadBalanceStatus.LOAD_BALANCE_NOT_CONFIGURED;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
index 51d62d7d4e..2a0d409fb9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
@@ -29,12 +29,15 @@ import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.ProcessorNode;
 import org.apache.nifi.controller.ScheduledState;
 import org.apache.nifi.controller.flow.FlowManager;
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.queue.LoadBalanceStrategy;
 import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.repository.FlowFileEvent;
 import org.apache.nifi.controller.repository.FlowFileEventRepository;
 import org.apache.nifi.controller.repository.RepositoryStatusReport;
 import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent;
 import org.apache.nifi.controller.status.ConnectionStatus;
+import org.apache.nifi.controller.status.LoadBalanceStatus;
 import org.apache.nifi.controller.status.PortStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
@@ -300,6 +303,16 @@ public abstract class AbstractEventAccess implements EventAccess {
                 connStatus.setQueuedCount(connectionQueuedCount);
             }
 
+            final FlowFileQueue flowFileQueue = conn.getFlowFileQueue();
+            final LoadBalanceStrategy loadBalanceStrategy = flowFileQueue.getLoadBalanceStrategy();
+            if (loadBalanceStrategy == LoadBalanceStrategy.DO_NOT_LOAD_BALANCE) {
+                connStatus.setLoadBalanceStatus(LoadBalanceStatus.LOAD_BALANCE_NOT_CONFIGURED);
+            } else if (flowFileQueue.isActivelyLoadBalancing()) {
+                connStatus.setLoadBalanceStatus(LoadBalanceStatus.LOAD_BALANCE_ACTIVE);
+            } else {
+                connStatus.setLoadBalanceStatus(LoadBalanceStatus.LOAD_BALANCE_INACTIVE);
+            }
+
             if (populateChildStatuses) {
                 connectionStatusCollection.add(connStatus);
             }