You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by li...@apache.org on 2023/05/21 13:36:24 UTC

[incubator-seatunnel] branch dev updated: [Improve][Zeta] Improve Zeta operation max count and ignore NPE (#4787)

This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 73ddfc4f2 [Improve][Zeta] Improve Zeta operation max count and ignore NPE (#4787)
73ddfc4f2 is described below

commit 73ddfc4f28423b35856625f59c39404afe1ec6d1
Author: Jia Fan <fa...@qq.com>
AuthorDate: Sun May 21 21:36:16 2023 +0800

    [Improve][Zeta] Improve Zeta operation max count and ignore NPE (#4787)
    
    * [Improve][Zeta] Improve Zeta operation max count and ignore NPE
    
    * [Improve][Zeta] Improve Zeta operation max count and ignore NPE
---
 config/hazelcast.yaml                              |  2 +-
 .../seatunnel/engine/server/master/JobMaster.java  | 50 +++++++++++-----------
 2 files changed, 26 insertions(+), 26 deletions(-)

diff --git a/config/hazelcast.yaml b/config/hazelcast.yaml
index 84a6e2457..87f607960 100644
--- a/config/hazelcast.yaml
+++ b/config/hazelcast.yaml
@@ -37,5 +37,5 @@ hazelcast:
     hazelcast.invocation.max.retry.count: 20
     hazelcast.tcp.join.port.try.count: 30
     hazelcast.logging.type: log4j2
-    hazelcast.operation.generic.thread.count: 100
+    hazelcast.operation.generic.thread.count: 1000
 
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index d13fcbbd1..de9da0a1b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -553,31 +553,31 @@ public class JobMaster {
     }
 
     private void cleanTaskGroupContext(PipelineLocation pipelineLocation) {
-        ownedSlotProfilesIMap
-                .get(pipelineLocation)
-                .forEach(
-                        (taskGroupLocation, slotProfile) -> {
-                            try {
-                                if (nodeEngine
-                                                .getClusterService()
-                                                .getMember(slotProfile.getWorker())
-                                        != null) {
-                                    NodeEngineUtil.sendOperationToMemberNode(
-                                                    nodeEngine,
-                                                    new CleanTaskGroupContextOperation(
-                                                            taskGroupLocation),
-                                                    slotProfile.getWorker())
-                                            .get();
-                                }
-                            } catch (HazelcastInstanceNotActiveException e) {
-                                LOGGER.warning(
-                                        String.format(
-                                                "%s clean TaskGroupContext with exception: %s.",
-                                                taskGroupLocation, ExceptionUtils.getMessage(e)));
-                            } catch (Exception e) {
-                                throw new SeaTunnelException(e.getMessage());
-                            }
-                        });
+        Map<TaskGroupLocation, SlotProfile> slotProfileMap =
+                ownedSlotProfilesIMap.get(pipelineLocation);
+        if (slotProfileMap == null) {
+            return;
+        }
+        slotProfileMap.forEach(
+                (taskGroupLocation, slotProfile) -> {
+                    try {
+                        if (nodeEngine.getClusterService().getMember(slotProfile.getWorker())
+                                != null) {
+                            NodeEngineUtil.sendOperationToMemberNode(
+                                            nodeEngine,
+                                            new CleanTaskGroupContextOperation(taskGroupLocation),
+                                            slotProfile.getWorker())
+                                    .get();
+                        }
+                    } catch (HazelcastInstanceNotActiveException e) {
+                        LOGGER.warning(
+                                String.format(
+                                        "%s clean TaskGroupContext with exception: %s.",
+                                        taskGroupLocation, ExceptionUtils.getMessage(e)));
+                    } catch (Exception e) {
+                        throw new SeaTunnelException(e.getMessage());
+                    }
+                });
     }
 
     public PhysicalPlan getPhysicalPlan() {