You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/08/17 18:06:06 UTC
[nifi] branch main updated: NIFI-10052 Avoid obtaining any locks when creating/sending heartbeats (#6298)
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 2685856c62 NIFI-10052 Avoid obtaining any locks when creating/sending heartbeats (#6298)
2685856c62 is described below
commit 2685856c629aa1bda20019981ed932ccecf9415a
Author: Hsin-Ying Lee <s9...@gmail.com>
AuthorDate: Thu Aug 18 02:05:55 2022 +0800
NIFI-10052 Avoid obtaining any locks when creating/sending heartbeats (#6298)
---
.../apache/nifi/groups/StandardProcessGroup.java | 22 +++++++++++++--
.../java/org/apache/nifi/groups/ProcessGroup.java | 6 ++++
.../org/apache/nifi/controller/FlowController.java | 32 ++--------------------
.../controller/service/mock/MockProcessGroup.java | 6 ++++
.../nifi/web/revision/NaiveRevisionManager.java | 2 +-
5 files changed, 36 insertions(+), 32 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 19d4fb2046..3e8ee1ea38 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -182,8 +182,8 @@ public final class StandardProcessGroup implements ProcessGroup {
private final Map<String, Port> inputPorts = new HashMap<>();
private final Map<String, Port> outputPorts = new HashMap<>();
- private final Map<String, Connection> connections = new HashMap<>();
- private final Map<String, ProcessGroup> processGroups = new HashMap<>();
+ private final Map<String, Connection> connections = new ConcurrentHashMap<>();
+ private final Map<String, ProcessGroup> processGroups = new ConcurrentHashMap<>();
private final Map<String, Label> labels = new HashMap<>();
private final Map<String, RemoteProcessGroup> remoteGroups = new HashMap<>();
private final Map<String, ProcessorNode> processors = new HashMap<>();
@@ -4255,6 +4255,24 @@ public final class StandardProcessGroup implements ProcessGroup {
}
}
+ @Override
+ public QueueSize getQueueSize() {
+ int count = 0;
+ long contentSize = 0L;
+
+ for (final ProcessGroup childGroup : processGroups.values()) {
+ final QueueSize queueSize = childGroup.getQueueSize();
+ count += queueSize.getObjectCount();
+ contentSize += queueSize.getByteCount();
+ }
+ for (final Connection connection : connections.values()) {
+ final QueueSize queueSize = connection.getFlowFileQueue().size();
+ count += queueSize.getObjectCount();
+ contentSize += queueSize.getByteCount();
+ }
+ return new QueueSize(count, contentSize);
+ }
+
@Override
public String getDefaultBackPressureDataSizeThreshold() {
// Use value in this object if it has been set. Otherwise, inherit from parent group; if at root group, obtain from nifi properties.
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
index 7579b721a2..cafe3764d8 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/groups/ProcessGroup.java
@@ -32,6 +32,7 @@ import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.Triggerable;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.flowfile.FlowFile;
@@ -1227,4 +1228,9 @@ public interface ProcessGroup extends ComponentAuthorizable, Positionable, Versi
* @param defaultBackPressureDataSizeThreshold new default back pressure size threshold (must include size unit label)
*/
void setDefaultBackPressureDataSizeThreshold(String defaultBackPressureDataSizeThreshold);
+
+ /**
+ * @return the QueueSize of this Process Group and all child Process Groups
+ */
+ QueueSize getQueueSize();
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 7ea5e82455..ce5fc64f83 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -373,7 +373,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
private boolean clustered;
// guarded by rwLock
- private NodeConnectionStatus connectionStatus;
+ private volatile NodeConnectionStatus connectionStatus;
private StatusAnalyticsEngine analyticsEngine;
@@ -2136,27 +2136,6 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
return resetValue;
}
- //
- // Access to controller status
- //
- public QueueSize getTotalFlowFileCount(final ProcessGroup group) {
- int count = 0;
- long contentSize = 0L;
-
- for (final Connection connection : group.getConnections()) {
- final QueueSize size = connection.getFlowFileQueue().size();
- count += size.getObjectCount();
- contentSize += size.getByteCount();
- }
- for (final ProcessGroup childGroup : group.getProcessGroups()) {
- final QueueSize size = getTotalFlowFileCount(childGroup);
- count += size.getObjectCount();
- contentSize += size.getByteCount();
- }
-
- return new QueueSize(count, contentSize);
- }
-
public class GroupStatusCounts {
private int queuedCount = 0;
private long queuedContentSize = 0;
@@ -3031,12 +3010,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
try {
HeartbeatBean bean = heartbeatBeanRef.get();
if (bean == null) {
- readLock.lock();
- try {
- bean = new HeartbeatBean(flowManager.getRootGroup(), isPrimary());
- } finally {
- readLock.unlock("createHeartbeatMessage");
- }
+ bean = new HeartbeatBean(flowManager.getRootGroup(), isPrimary());
}
// create heartbeat payload
@@ -3045,7 +3019,7 @@ public class FlowController implements ReportingTaskProvider, Authorizable, Node
hbPayload.setActiveThreadCount(getActiveThreadCount());
hbPayload.setRevisionUpdateCount(revisionManager.getRevisionUpdateCount());
- final QueueSize queueSize = getTotalFlowFileCount(bean.getRootGroup());
+ final QueueSize queueSize = bean.getRootGroup().getQueueSize();
hbPayload.setTotalFlowFileCount(queueSize.getObjectCount());
hbPayload.setTotalFlowFileBytes(queueSize.getByteCount());
hbPayload.setClusterStatus(clusterCoordinator.getConnectionStatuses());
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
index 35657d0d3f..fe073e2fc9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/service/mock/MockProcessGroup.java
@@ -32,6 +32,7 @@ import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.flow.FlowManager;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.queue.DropFlowFileStatus;
+import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.flow.VersionedExternalFlow;
import org.apache.nifi.groups.BatchCounts;
@@ -853,6 +854,11 @@ public class MockProcessGroup implements ProcessGroup {
this.defaultBackPressureDataSizeThreshold = defaultBackPressureDataSizeThreshold;
}
+ @Override
+ public QueueSize getQueueSize() {
+ return null;
+ }
+
@Override
public void terminateProcessor(ProcessorNode processor) {
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
index 42a0763b69..43ec271ffa 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-web/nifi-web-optimistic-locking/src/main/java/org/apache/nifi/web/revision/NaiveRevisionManager.java
@@ -68,7 +68,7 @@ public class NaiveRevisionManager implements RevisionManager {
}
@Override
- public synchronized long getRevisionUpdateCount() {
+ public long getRevisionUpdateCount() {
return revisionUpdateCounter.get();
}