You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2023/05/29 05:42:30 UTC
[seatunnel] branch dev updated: [Improve][Zeta] Reduce the number of IMAPs used by checkpointIdCounter (#4832)
This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ea440427c [Improve][Zeta] Reduce the number of IMAPs used by checkpointIdCounter (#4832)
ea440427c is described below
commit ea440427cf838bfae62a652be9418c5f6ea8d1ea
Author: ic4y <83...@users.noreply.github.com>
AuthorDate: Mon May 29 13:42:21 2023 +0800
[Improve][Zeta] Reduce the number of IMAPs used by checkpointIdCounter (#4832)
---
.../apache/seatunnel/engine/common/Constant.java | 2 +-
.../server/checkpoint/CheckpointManager.java | 7 +---
.../server/checkpoint/IMapCheckpointIDCounter.java | 41 +++++++++++++++++-----
3 files changed, 35 insertions(+), 15 deletions(-)
diff --git a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
index bbb01167f..12e13bc19 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/Constant.java
@@ -53,7 +53,7 @@ public class Constant {
public static final String IMAP_OWNED_SLOT_PROFILES = "engine_ownedSlotProfilesIMap";
- public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-%d";
+ public static final String IMAP_CHECKPOINT_ID = "engine_checkpoint-id-map";
public static final String IMAP_RUNNING_JOB_METRICS = "engine_runningJobMetrics";
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index 971266e09..48bc9a454 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -42,7 +42,6 @@ import org.apache.seatunnel.engine.server.task.operation.TaskOperation;
import org.apache.seatunnel.engine.server.task.statemachine.SeaTunnelTaskState;
import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
-import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.operationservice.impl.InvocationFuture;
import lombok.extern.slf4j.Slf4j;
@@ -53,8 +52,6 @@ import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;
-
/**
* Used to manage all checkpoints for a job.
*
@@ -99,8 +96,6 @@ public class CheckpointManager {
CheckpointStorageFactory.class,
checkpointConfig.getStorage().getStorage())
.create(checkpointConfig.getStorage().getStoragePluginConfig());
- IMap<Integer, Long> checkpointIdMap =
- nodeEngine.getHazelcastInstance().getMap(String.format(IMAP_CHECKPOINT_ID, jobId));
this.coordinatorMap =
checkpointPlanMap
.values()
@@ -109,7 +104,7 @@ public class CheckpointManager {
plan -> {
IMapCheckpointIDCounter idCounter =
new IMapCheckpointIDCounter(
- plan.getPipelineId(), checkpointIdMap);
+ jobId, plan.getPipelineId(), nodeEngine);
try {
idCounter.start();
PipelineState pipelineState =
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
index 153e7b652..ffbcea0e1 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/IMapCheckpointIDCounter.java
@@ -24,25 +24,34 @@ import org.apache.seatunnel.engine.core.job.PipelineStatus;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.map.IMap;
+import com.hazelcast.spi.impl.NodeEngine;
+import java.nio.ByteBuffer;
+import java.util.Base64;
import java.util.concurrent.CompletableFuture;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;
public class IMapCheckpointIDCounter implements CheckpointIDCounter {
+
+ private final Long jobID;
private final Integer pipelineId;
- private final IMap<Integer, Long> checkpointIdMap;
+ private final String key;
+ private final IMap<String, Long> checkpointIdMap;
- public IMapCheckpointIDCounter(Integer pipelineId, IMap<Integer, Long> checkpointIdMap) {
+ public IMapCheckpointIDCounter(Long jobID, Integer pipelineId, NodeEngine nodeEngine) {
+ this.jobID = jobID;
this.pipelineId = pipelineId;
- this.checkpointIdMap = checkpointIdMap;
+ this.key = convertLongIntToBase64(jobID, pipelineId);
+ this.checkpointIdMap = nodeEngine.getHazelcastInstance().getMap(IMAP_CHECKPOINT_ID);
}
@Override
public void start() throws Exception {
RetryUtils.retryWithException(
() -> {
- return checkpointIdMap.putIfAbsent(pipelineId, INITIAL_CHECKPOINT_ID);
+ return checkpointIdMap.putIfAbsent(key, INITIAL_CHECKPOINT_ID);
},
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
@@ -54,25 +63,41 @@ public class IMapCheckpointIDCounter implements CheckpointIDCounter {
@Override
public CompletableFuture<Void> shutdown(PipelineStatus pipelineStatus) {
if (pipelineStatus.isEndState()) {
- checkpointIdMap.remove(pipelineId);
+ checkpointIdMap.remove(key);
}
return CompletableFuture.completedFuture(null);
}
@Override
public long getAndIncrement() throws Exception {
- Long nextId = checkpointIdMap.compute(pipelineId, (k, v) -> v == null ? null : v + 1);
+ Long nextId = checkpointIdMap.compute(key, (k, v) -> v == null ? null : v + 1);
checkNotNull(nextId);
return nextId - 1;
}
@Override
public long get() {
- return checkpointIdMap.get(pipelineId);
+ return checkpointIdMap.get(key);
}
@Override
public void setCount(long newId) throws Exception {
- checkpointIdMap.put(pipelineId, newId);
+ checkpointIdMap.put(key, newId);
+ }
+
+ public static String convertLongIntToBase64(long longValue, int intValue) {
+ ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES + Integer.BYTES);
+ buffer.putLong(longValue);
+ buffer.putInt(intValue);
+ byte[] bytes = buffer.array();
+ return Base64.getEncoder().encodeToString(bytes);
+ }
+
+ public static long[] convertBase64ToLongInt(String encodedStr) {
+ byte[] decodedBytes = Base64.getDecoder().decode(encodedStr);
+ ByteBuffer buffer = ByteBuffer.wrap(decodedBytes);
+ long longValue = buffer.getLong();
+ int intValue = buffer.getInt();
+ return new long[] {longValue, intValue};
}
}