You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/05/26 12:52:31 UTC

[seatunnel] branch dev updated: [Hotfix][Zeta] Fix cpu load problem (#4828)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7a81fd375 [Hotfix][Zeta] Fix cpu load problem (#4828)
7a81fd375 is described below

commit 7a81fd3751048492df5d3b38ce4bb5a5d7a26986
Author: Eric <ga...@gmail.com>
AuthorDate: Fri May 26 20:52:26 2023 +0800

    [Hotfix][Zeta] Fix cpu load problem (#4828)
---
 .../engine/server/task/SeaTunnelSourceCollector.java          | 11 +++++++++++
 .../engine/server/task/flow/SourceFlowLifeCycle.java          |  5 +++++
 2 files changed, 16 insertions(+)

diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 7ebcdf0d8..5cab2dd0b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -40,6 +40,8 @@ public class SeaTunnelSourceCollector<T> implements Collector<T> {
 
     private final Meter sourceReceivedQPS;
 
+    private volatile long rowCountThisPollNext;
+
     public SeaTunnelSourceCollector(
             Object checkpointLock,
             List<OneInputFlowLifeCycle<Record<?>>> outputs,
@@ -54,6 +56,7 @@ public class SeaTunnelSourceCollector<T> implements Collector<T> {
     public void collect(T row) {
         try {
             sendRecordToNext(new Record<>(row));
+            rowCountThisPollNext++;
             sourceReceivedCount.inc();
             sourceReceivedQPS.markEvent();
         } catch (IOException e) {
@@ -66,6 +69,14 @@ public class SeaTunnelSourceCollector<T> implements Collector<T> {
         return checkpointLock;
     }
 
+    public long getRowCountThisPollNext() {
+        return this.rowCountThisPollNext;
+    }
+
+    public void resetRowCountThisPollNext() {
+        this.rowCountThisPollNext = 0;
+    }
+
     public void sendRecordToNext(Record<?> record) throws IOException {
         synchronized (checkpointLock) {
             for (OneInputFlowLifeCycle<Record<?>> output : outputs) {
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
index 16adec49e..8430f6898 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SourceFlowLifeCycle.java
@@ -133,6 +133,11 @@ public class SourceFlowLifeCycle<T, SplitT extends SourceSplit> extends ActionFl
     public void collect() throws Exception {
         if (!prepareClose) {
             reader.pollNext(collector);
+            if (collector.getRowCountThisPollNext() == 0) {
+                Thread.sleep(100);
+            } else {
+                collector.resetRowCountThisPollNext();
+            }
         }
     }