You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/05/26 12:52:31 UTC
[seatunnel] branch dev updated: [Hotfix][Zeta] Fix cpu load problem (#4828)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7a81fd375 [Hotfix][Zeta] Fix cpu load problem (#4828)
7a81fd375 is described below
commit 7a81fd3751048492df5d3b38ce4bb5a5d7a26986
Author: Eric <ga...@gmail.com>
AuthorDate: Fri May 26 20:52:26 2023 +0800
[Hotfix][Zeta] Fix cpu load problem (#4828)
---
.../engine/server/task/SeaTunnelSourceCollector.java | 11 +++++++++++
.../engine/server/task/flow/SourceFlowLifeCycle.java | 5 +++++
2 files changed, 16 insertions(+)
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 7ebcdf0d8..5cab2dd0b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -40,6 +40,8 @@ public class SeaTunnelSourceCollector<T> implements Collector<T> {
private final Meter sourceReceivedQPS;
+ private volatile long rowCountThisPollNext;
+
public SeaTunnelSourceCollector(
Object checkpointLock,
List<OneInputFlowLifeCycle<Record<?>>> outputs,
@@ -54,6 +56,7 @@ public class SeaTunnelSourceCollector<T> implements Collector<T> {
public void collect(T row) {
try {
sendRecordToNext(new Record<>(row));
+ rowCountThisPollNext++;
sourceReceivedCount.inc();
sourceReceivedQPS.markEvent();
} catch (IOException e) {
@@ -66,6 +69,14 @@ public class SeaTunnelSourceCollector<T> implements Collector<T> {
return checkpointLock;
}
+ public long getRowCountThisPollNext() {
+ return this.rowCountThisPollNext;
+ }
+
+ public void resetRowCountThisPollNext() {
+ this.rowCountThisPollNext = 0;
+ }
+
public void sendRecordToNext(Record<?> record) throws IOException {
synchronized (checkpointLock) {
for (OneInputFlowLifeCycle<Record<?>> output : outputs) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 16adec49e..8430f6898 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -133,6 +133,11 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends ActionFl
public void collect() throws Exception {
if (!prepareClose) {
reader.pollNext(collector);
+ if (collector.getRowCountThisPollNext() == 0) {
+ Thread.sleep(100);
+ } else {
+ collector.resetRowCountThisPollNext();
+ }
}
}