You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/05/29 05:42:30 UTC

[seatunnel] branch dev updated: [Improve][Zeta] Reduce the number of IMAPs used by checkpointIdCounter (#4832)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ea440427c [Improve][Zeta] Reduce the number of IMAPs used by checkpointIdCounter (#4832)
ea440427c is described below

commit ea440427cf838bfae62a652be9418c5f6ea8d1ea
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Mon May 29 13:42:21 2023 +0800

    [Improve][Zeta] Reduce the number of IMAPs used by checkpointIdCounter (#4832)
---
 .../apache/seatunnel/engine/common/Constant.java   |  2 +-
 .../server/checkpoint/CheckpointManager.java       |  7 +---
 .../server/checkpoint/IMapCheckpointIDCounter.java | 41 +++++++++++++++++-----
 3 files changed, 35 insertions(+), 15 deletions(-)

diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index bbb01167f..12e13bc19 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -53,7 +53,7 @@ public class Constant {
 
     public static final String IMAP_OWNED_SLOT_PROFILES = "engine_ownedSlotProfilesIMap";
 
-    public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-%d";
+    public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-map";
 
     public static final String IMAP_RUNNING_JOB_METRICS = "engine_runningJobMetrics";
 }
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 971266e09..48bc9a454 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -42,7 +42,6 @@ import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
 import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
-import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
 import lombok.extern.slf4j.Slf4j;
@@ -53,8 +52,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;
-
 /**
  * Used to manage all checkpoints for a job.
  *
@@ -99,8 +96,6 @@ public class CheckpointManager {
                                 CheckpointStorageFactory.class,
                                 checkpointConfig.getStorage().getStorage())
                         .create(checkpointConfig.getStorage().getStoragePluginConfig());
-        IMap<Integer, Long> checkpointIdMap =
-                nodeEngine.getHazelcastInstance().getMap(String.format(IMAP_CHECKPOINT_ID, jobId));
         this.coordinatorMap =
                 checkpointPlanMap
                         .values()
@@ -109,7 +104,7 @@ public class CheckpointManager {
                                 plan -> {
                                     IMapCheckpointIDCounter idCounter =
                                             new IMapCheckpointIDCounter(
-                                                    plan.getPipelineId(), checkpointIdMap);
+                                                    jobId, plan.getPipelineId(), nodeEngine);
                                     try {
                                         idCounter.start();
                                         PipelineState pipelineState =
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
index 153e7b652..ffbcea0e1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
@@ -24,25 +24,34 @@ import org.apache.seatunnel.engine.core.job.PipelineStatus;
 
 import com.hazelcast.core.HazelcastInstanceNotActiveException;
 import com.hazelcast.map.IMap;
+import com.hazelcast.spi.impl.NodeEngine;
 
+import java.nio.ByteBuffer;
+import java.util.Base64;
 import java.util.concurrent.CompletableFuture;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;
 
 public class IMapCheckpointIDCounter implements CheckpointIDCounter {
+
+    private final Long jobID;
     private final Integer pipelineId;
-    private final IMap<Integer, Long> checkpointIdMap;
+    private final String key;
+    private final IMap<String, Long> checkpointIdMap;
 
-    public IMapCheckpointIDCounter(Integer pipelineId, IMap<Integer, Long> checkpointIdMap) {
+    public IMapCheckpointIDCounter(Long jobID, Integer pipelineId, NodeEngine nodeEngine) {
+        this.jobID = jobID;
         this.pipelineId = pipelineId;
-        this.checkpointIdMap = checkpointIdMap;
+        this.key = convertLongIntToBase64(jobID, pipelineId);
+        this.checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(IMAP_CHECKPOINT_ID);
     }
 
     @Override
     public void start() throws Exception {
         RetryUtils.retryWithException(
                 () -> {
-                    return checkpointIdMap.putIfAbsent(pipelineId, INITIAL_CHECKPOINT_ID);
+                    return checkpointIdMap.putIfAbsent(key, INITIAL_CHECKPOINT_ID);
                 },
                 new RetryUtils.RetryMaterial(
                         Constant.OPERATION_RETRY_TIME,
@@ -54,25 +63,41 @@ public class IMapCheckpointIDCounter implements CheckpointIDCounter {
     @Override
     public CompletableFuture<Void> shutdown(PipelineStatus pipelineStatus) {
         if (pipelineStatus.isEndState()) {
-            checkpointIdMap.remove(pipelineId);
+            checkpointIdMap.remove(key);
         }
         return CompletableFuture.completedFuture(null);
     }
 
     @Override
     public long getAndIncrement() throws Exception {
-        Long nextId = checkpointIdMap.compute(pipelineId, (k, v) -> v == null ? null : v + 1);
+        Long nextId = checkpointIdMap.compute(key, (k, v) -> v == null ? null : v + 1);
         checkNotNull(nextId);
         return nextId - 1;
     }
 
     @Override
     public long get() {
-        return checkpointIdMap.get(pipelineId);
+        return checkpointIdMap.get(key);
     }
 
     @Override
     public void setCount(long newId) throws Exception {
-        checkpointIdMap.put(pipelineId, newId);
+        checkpointIdMap.put(key, newId);
+    }
+
+    public static String convertLongIntToBase64(long longValue, int intValue) {
+        ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
+        buffer.putLong(longValue);
+        buffer.putInt(intValue);
+        byte[] bytes = buffer.array();
+        return Base64.getEncoder().encodeToString(bytes);
+    }
+
+    public static long[] convertBase64ToLongInt(String encodedStr) {
+        byte[] decodedBytes = Base64.getDecoder().decode(encodedStr);
+        ByteBuffer buffer = ByteBuffer.wrap(decodedBytes);
+        long longValue = buffer.getLong();
+        int intValue = buffer.getInt();
+        return new long[] {longValue, intValue};
     }
 }