You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by li...@apache.org on 2023/05/21 13:36:24 UTC
[incubator-seatunnel] branch dev updated: [Improve][Zeta] Improve Zeta operation max count and ignore NPE (#4787)
This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 73ddfc4f2 [Improve][Zeta] Improve Zeta operation max count and ignore NPE (#4787)
73ddfc4f2 is described below
commit 73ddfc4f28423b35856625f59c39404afe1ec6d1
Author: Jia Fan <fa...@qq.com>
AuthorDate: Sun May 21 21:36:16 2023 +0800
[Improve][Zeta] Improve Zeta operation max count and ignore NPE (#4787)
* [Improve][Zeta] Improve Zeta operation max count and ignore NPE
* [Improve][Zeta] Improve Zeta operation max count and ignore NPE
---
config/hazelcast.yaml | 2 +-
.../seatunnel/engine/server/master/JobMaster.java | 50 +++++++++++-----------
2 files changed, 26 insertions(+), 26 deletions(-)
diff --git a/config/hazelcast.yaml b/config/hazelcast.yaml
index 84a6e2457..87f607960 100644
--- a/config/hazelcast.yaml
+++ b/config/hazelcast.yaml
@@ -37,5 +37,5 @@ hazelcast:
hazelcast.invocation.max.retry.count: 20
hazelcast.tcp.join.port.try.count: 30
hazelcast.logging.type: log4j2
- hazelcast.operation.generic.thread.count: 100
+ hazelcast.operation.generic.thread.count: 1000
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index d13fcbbd1..de9da0a1b 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -553,31 +553,31 @@ public class JobMaster {
}
private void cleanTaskGroupContext(PipelineLocation pipelineLocation) {
- ownedSlotProfilesIMap
- .get(pipelineLocation)
- .forEach(
- (taskGroupLocation, slotProfile) -> {
- try {
- if (nodeEngine
- .getClusterService()
- .getMember(slotProfile.getWorker())
- != null) {
- NodeEngineUtil.sendOperationToMemberNode(
- nodeEngine,
- new CleanTaskGroupContextOperation(
- taskGroupLocation),
- slotProfile.getWorker())
- .get();
- }
- } catch (HazelcastInstanceNotActiveException e) {
- LOGGER.warning(
- String.format(
- "%s clean TaskGroupContext with exception: %s.",
- taskGroupLocation, ExceptionUtils.getMessage(e)));
- } catch (Exception e) {
- throw new SeaTunnelException(e.getMessage());
- }
- });
+ Map<TaskGroupLocation, SlotProfile> slotProfileMap =
+ ownedSlotProfilesIMap.get(pipelineLocation);
+ if (slotProfileMap == null) {
+ return;
+ }
+ slotProfileMap.forEach(
+ (taskGroupLocation, slotProfile) -> {
+ try {
+ if (nodeEngine.getClusterService().getMember(slotProfile.getWorker())
+ != null) {
+ NodeEngineUtil.sendOperationToMemberNode(
+ nodeEngine,
+ new CleanTaskGroupContextOperation(taskGroupLocation),
+ slotProfile.getWorker())
+ .get();
+ }
+ } catch (HazelcastInstanceNotActiveException e) {
+ LOGGER.warning(
+ String.format(
+ "%s clean TaskGroupContext with exception: %s.",
+ taskGroupLocation, ExceptionUtils.getMessage(e)));
+ } catch (Exception e) {
+ throw new SeaTunnelException(e.getMessage());
+ }
+ });
}
public PhysicalPlan getPhysicalPlan() {