You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/08/17 18:06:06 UTC

[nifi] branch main updated: NIFI-10052 Avoid obtaining any locks when creating/sending heartbeats (#6298)

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 2685856c62 NIFI-10052 Avoid obtaining any locks when creating/sending heartbeats (#6298)
2685856c62 is described below

commit 2685856c629aa1bda20019981ed932ccecf9415a
Author: Hsin-Ying Lee <s9...@gmail.com>
AuthorDate: Thu Aug 18 02:05:55 2022 +0800

    NIFI-10052 Avoid obtaining any locks when creating/sending heartbeats (#6298)
---
 .../apache/nifi/groups/StandardProcessGroup.java   | 22 +++++++++++++--
 .../java/org/apache/nifi/groups/ProcessGroup.java  |  6 ++++
 .../org/apache/nifi/controller/FlowController.java | 32 ++--------------------
 .../controller/service/mock/MockProcessGroup.java  |  6 ++++
 .../nifi/web/revision/NaiveRevisionManager.java    |  2 +-
 5 files changed, 36 insertions(+), 32 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 19d4fb2046..3e8ee1ea38 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -182,8 +182,8 @@ public final class StandardProcessGroup implements ProcessGroup {
 
     private final Map<String, Port> inputPorts = new HashMap<>();
     private final Map<String, Port> outputPorts = new HashMap<>();
-    private final Map<String, Connection> connections = new HashMap<>();
-    private final Map<String, ProcessGroup> processGroups = new HashMap<>();
+    private final Map<String, Connection> connections = new ConcurrentHashMap<>();
+    private final Map<String, ProcessGroup> processGroups = new ConcurrentHashMap<>();
     private final Map<String, Label> labels = new HashMap<>();
     private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>();
     private final Map<String, ProcessorNode> processors = new HashMap<>();
@@ -4255,6 +4255,24 @@ public final class StandardProcessGroup implements ProcessGroup {
         }
     }
 
+    @Override
+    public QueueSize getQueueSize() {
+        int count = 0;
+        long contentSize = 0L;
+
+        for (final ProcessGroup childGroup : processGroups.values()) {
+            final QueueSize queueSize = childGroup.getQueueSize();
+            count += queueSize.getObjectCount();
+            contentSize += queueSize.getByteCount();
+        }
+        for (final Connection connection : connections.values()) {
+            final QueueSize queueSize = connection.getFlowFileQueue().size();
+            count += queueSize.getObjectCount();
+            contentSize += queueSize.getByteCount();
+        }
+        return new QueueSize(count, contentSize);
+    }
+
     @Override
     public String getDefaultBackPressureDataSizeThreshold() {
         // Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index 7579b721a2..cafe3764d8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -32,6 +32,7 @@ import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.Triggerable;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flow.VersionedExternalFlow;
 import org.apache.nifi.flowfile.FlowFile;
@@ -1227,4 +1228,9 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
      * @param defaultBackPressureDataSizeThreshold new default back pressure size threshold (must include size unit label)
      */
     void setDefaultBackPressureDataSizeThreshold(String defaultBackPressureDataSizeThreshold);
+
+    /**
+     * @return the QueueSize of this Process Group and all child Process Groups
+     */
+    QueueSize getQueueSize();
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 7ea5e82455..ce5fc64f83 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -373,7 +373,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
     private boolean clustered;
 
     // guarded by rwLock
-    private NodeConnectionStatus connectionStatus;
+    private volatile NodeConnectionStatus connectionStatus;
 
     private StatusAnalyticsEngine analyticsEngine;
 
@@ -2136,27 +2136,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         return resetValue;
     }
 
-    //
-    // Access to controller status
-    //
-    public QueueSize getTotalFlowFileCount(final ProcessGroup group) {
-        int count = 0;
-        long contentSize = 0L;
-
-        for (final Connection connection : group.getConnections()) {
-            final QueueSize size = connection.getFlowFileQueue().size();
-            count += size.getObjectCount();
-            contentSize += size.getByteCount();
-        }
-        for (final ProcessGroup childGroup : group.getProcessGroups()) {
-            final QueueSize size = getTotalFlowFileCount(childGroup);
-            count += size.getObjectCount();
-            contentSize += size.getByteCount();
-        }
-
-        return new QueueSize(count, contentSize);
-    }
-
     public class GroupStatusCounts {
         private int queuedCount = 0;
         private long queuedContentSize = 0;
@@ -3031,12 +3010,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
         try {
             HeartbeatBean bean = heartbeatBeanRef.get();
             if (bean == null) {
-                readLock.lock();
-                try {
-                    bean = new HeartbeatBean(flowManager.getRootGroup(), isPrimary());
-                } finally {
-                    readLock.unlock("createHeartbeatMessage");
-                }
+                bean = new HeartbeatBean(flowManager.getRootGroup(), isPrimary());
             }
 
             // create heartbeat payload
@@ -3045,7 +3019,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
             hbPayload.setActiveThreadCount(getActiveThreadCount());
             hbPayload.setRevisionUpdateCount(revisionManager.getRevisionUpdateCount());
 
-            final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
+            final QueueSize queueSize = bean.getRootGroup().getQueueSize();
             hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
             hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
             hbPayload.setClusterStatus(clusterCoordinator.getConnectionStatuses());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 35657d0d3f..fe073e2fc9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -32,6 +32,7 @@ import org.apache.nifi.controller.Template;
 import org.apache.nifi.controller.flow.FlowManager;
 import org.apache.nifi.controller.label.Label;
 import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.QueueSize;
 import org.apache.nifi.controller.service.ControllerServiceNode;
 import org.apache.nifi.flow.VersionedExternalFlow;
 import org.apache.nifi.groups.BatchCounts;
@@ -853,6 +854,11 @@ public class MockProcessGroup implements ProcessGroup {
         this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
     }
 
+    @Override
+    public QueueSize getQueueSize() {
+        return null;
+    }
+
     @Override
     public void terminateProcessor(ProcessorNode processor) {
     }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
index 42a0763b69..43ec271ffa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
@@ -68,7 +68,7 @@ public class NaiveRevisionManager implements RevisionManager {
     }
 
     @Override
-    public synchronized long getRevisionUpdateCount() {
+    public long getRevisionUpdateCount() {
         return revisionUpdateCounter.get();
     }