You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by fa...@apache.org on 2022/06/14 02:22:51 UTC

[incubator-seatunnel] branch api-draft updated: [Api-draft] change kafka sink transaction (#2010)

This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 62c8a175 [Api-draft] change kafka sink transaction (#2010)
62c8a175 is described below

commit 62c8a1753dc9956c6aa2b76ba6bec1392700cc80
Author: Hisoka <10...@qq.com>
AuthorDate: Tue Jun 14 10:22:46 2022 +0800

    [Api-draft] change kafka sink transaction (#2010)
    
    * change kafka sink transaction
    
    * support kafka transaction
    
    * fixed kafka sink transaction problem
---
 .../seatunnel/common/utils/ReflectionUtils.java    |  49 ++++++
 .../connectors/seatunnel/kafka/config/Config.java  |   5 +
 .../kafka/sink/KafkaInternalProducer.java          | 164 +++++++++++++++++++++
 .../kafka/sink/KafkaNoTransactionSender.java       |   6 +-
 .../seatunnel/kafka/sink/KafkaProduceSender.java   |   8 +-
 .../seatunnel/kafka/sink/KafkaSinkCommitter.java   |  18 ++-
 .../seatunnel/kafka/sink/KafkaSinkWriter.java      |  62 ++++++--
 .../kafka/sink/KafkaTransactionSender.java         |  60 ++++----
 .../kafka/source/KafkaSourceSplitEnumerator.java   |   2 +-
 .../seatunnel/kafka/state/KafkaCommitInfo.java     |   2 +
 .../seatunnel/kafka/state/KafkaSinkState.java      |   2 +
 11 files changed, 325 insertions(+), 53 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java
index 597a8df1..3edd9ccc 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.common.utils;
 
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.util.Optional;
 
@@ -39,4 +41,51 @@ public class ReflectionUtils {
         return method;
     }
 
+    public static Optional<Object> getField(Object object, Class<?> clazz, String fieldName) {
+        try {
+            Field field = clazz.getDeclaredField(fieldName);
+            field.setAccessible(true);
+            return Optional.of(field.get(object));
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            return Optional.empty();
+        }
+    }
+
+    public static Optional<Object> getField(Object object, String fieldName) {
+        return getField(object, object.getClass(), fieldName);
+    }
+
+    public static void setField(Object object, Class<?> clazz, String fieldName, Object value) {
+        try {
+            Field field = clazz.getDeclaredField(fieldName);
+            field.setAccessible(true);
+            field.set(object, value);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new RuntimeException("Incompatible KafkaProducer version", e);
+        }
+    }
+
+    public static void setField(Object object, String fieldName, Object value) {
+        setField(object, object.getClass(), fieldName, value);
+    }
+
+    public static Object invoke(Object object, String methodName, Object... args) {
+        Class<?>[] argTypes = new Class[args.length];
+        for (int i = 0; i < args.length; i++) {
+            argTypes[i] = args[i].getClass();
+        }
+        return invoke(object, methodName, argTypes, args);
+    }
+
+    public static Object invoke(
+            Object object, String methodName, Class<?>[] argTypes, Object[] args) {
+        try {
+            Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
+            method.setAccessible(true);
+            return method.invoke(object, args);
+        } catch (NoSuchMethodException | InvocationTargetException | IllegalAccessException e) {
+            throw new RuntimeException("method invoke failed", e);
+        }
+    }
+
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 89f9c232..ee149204 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -45,4 +45,9 @@ public class Config {
      * consumer group of kafka client consume message.
      */
     public static final String COMMIT_ON_CHECKPOINT = "commit_on_checkpoint";
+
+    /**
+     * The prefix of kafka's transactionId, make sure different job use different prefix.
+     */
+    public static final String TRANSACTION_PREFIX = "transaction_prefix";
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
new file mode 100644
index 00000000..77d2535f
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaInternalProducer.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
+
+import org.apache.seatunnel.common.utils.ReflectionUtils;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.internals.TransactionManager;
+import org.apache.kafka.common.errors.ProducerFencedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.util.Optional;
+import java.util.Properties;
+
+/**
+ * A {@link KafkaProducer} that allow resume transaction from transactionId
+ */
+public class KafkaInternalProducer<K, V> extends KafkaProducer<K, V> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaInternalProducer.class);
+
+    private static final String TRANSACTION_MANAGER_STATE_ENUM =
+            "org.apache.kafka.clients.producer.internals.TransactionManager$State";
+    private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch";
+    private String transactionalId;
+
+    public KafkaInternalProducer(Properties properties, String transactionId) {
+        super(properties);
+        this.transactionalId = transactionId;
+    }
+
+    @Override
+    public void initTransactions() {
+        setTransactionalId(this.transactionalId);
+        super.initTransactions();
+    }
+
+    @Override
+    public void beginTransaction() throws ProducerFencedException {
+        super.beginTransaction();
+    }
+
+    @Override
+    public void commitTransaction() throws ProducerFencedException {
+        super.commitTransaction();
+    }
+
+    @Override
+    public void abortTransaction() throws ProducerFencedException {
+        super.abortTransaction();
+    }
+
+    public void setTransactionalId(String transactionalId) {
+        if (!transactionalId.equals(this.transactionalId)) {
+            Object transactionManager = getTransactionManager();
+            synchronized (transactionManager) {
+                ReflectionUtils.setField(transactionManager, "transactionalId", transactionalId);
+                ReflectionUtils.setField(transactionManager, "currentState",
+                        getTransactionManagerState("UNINITIALIZED"));
+                this.transactionalId = transactionalId;
+            }
+        }
+    }
+
+    public short getEpoch() {
+        Object transactionManager = getTransactionManager();
+        Optional<Object> producerIdAndEpoch = ReflectionUtils.getField(transactionManager,
+                PRODUCER_ID_AND_EPOCH_FIELD_NAME);
+        return (short) ReflectionUtils.getField(producerIdAndEpoch.get(), "epoch").get();
+    }
+
+    public long getProducerId() {
+        Object transactionManager = getTransactionManager();
+        Object producerIdAndEpoch = ReflectionUtils.getField(transactionManager,
+                PRODUCER_ID_AND_EPOCH_FIELD_NAME).get();
+        return (long) ReflectionUtils.getField(producerIdAndEpoch, "producerId").get();
+    }
+
+    public void resumeTransaction(long producerId, short epoch) {
+
+        LOGGER.info(
+                "Attempting to resume transaction {} with producerId {} and epoch {}",
+                transactionalId,
+                producerId,
+                epoch);
+
+        Object transactionManager = getTransactionManager();
+        synchronized (transactionManager) {
+            Object topicPartitionBookkeeper =
+                    ReflectionUtils.getField(transactionManager, transactionManager.getClass(),
+                            "topicPartitionBookkeeper").get();
+
+            transitionTransactionManagerStateTo(transactionManager, "INITIALIZING");
+            ReflectionUtils.invoke(topicPartitionBookkeeper, "reset");
+
+            ReflectionUtils.setField(
+                    transactionManager, PRODUCER_ID_AND_EPOCH_FIELD_NAME,
+                    createProducerIdAndEpoch(producerId, epoch));
+
+            transitionTransactionManagerStateTo(transactionManager, "READY");
+
+            transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION");
+            ReflectionUtils.setField(transactionManager, "transactionStarted", true);
+        }
+    }
+
+    private static Object createProducerIdAndEpoch(long producerId, short epoch) {
+        try {
+            Field field =
+                    TransactionManager.class.getDeclaredField(PRODUCER_ID_AND_EPOCH_FIELD_NAME);
+            Class<?> clazz = field.getType();
+            Constructor<?> constructor = clazz.getDeclaredConstructor(Long.TYPE, Short.TYPE);
+            constructor.setAccessible(true);
+            return constructor.newInstance(producerId, epoch);
+        } catch (InvocationTargetException | InstantiationException | IllegalAccessException |
+                 NoSuchFieldException | NoSuchMethodException e) {
+            throw new RuntimeException("Incompatible KafkaProducer version", e);
+        }
+    }
+
+    private Object getTransactionManager() {
+        Optional<Object> transactionManagerOptional = ReflectionUtils.getField(this, KafkaProducer.class,
+                "transactionManager");
+        if (!transactionManagerOptional.isPresent()) {
+            throw new RuntimeException("can't get transactionManager in KafkaProducer");
+        }
+        return transactionManagerOptional.get();
+    }
+
+    private static void transitionTransactionManagerStateTo(
+            Object transactionManager, String state) {
+        ReflectionUtils.invoke(transactionManager, "transitionTo", getTransactionManagerState(state));
+    }
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    private static Enum<?> getTransactionManagerState(String enumName) {
+        try {
+            Class<Enum> cl = (Class<Enum>) Class.forName(TRANSACTION_MANAGER_STATE_ENUM);
+            return Enum.valueOf(cl, enumName);
+        } catch (ClassNotFoundException e) {
+            throw new RuntimeException("Incompatible KafkaProducer version", e);
+        }
+    }
+
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
index 4760830f..59a16ef0 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaNoTransactionSender.java
@@ -48,7 +48,7 @@ public class KafkaNoTransactionSender<K, V> implements KafkaProduceSender<K, V>
     }
 
     @Override
-    public void beginTransaction() {
+    public void beginTransaction(String transactionId) {
         // no-op
     }
 
@@ -63,12 +63,12 @@ public class KafkaNoTransactionSender<K, V> implements KafkaProduceSender<K, V>
     }
 
     @Override
-    public void abortTransaction(List<KafkaSinkState> kafkaStates) {
+    public void abortTransaction(long checkpointId) {
         // no-op
     }
 
     @Override
-    public List<KafkaSinkState> snapshotState() {
+    public List<KafkaSinkState> snapshotState(long checkpointId) {
         kafkaProducer.flush();
         return Collections.emptyList();
     }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
index b9aa4105..3e1ef080 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaProduceSender.java
@@ -33,7 +33,7 @@ public interface KafkaProduceSender<K, V> extends AutoCloseable {
      */
     void send(ProducerRecord<K, V> producerRecord);
 
-    void beginTransaction();
+    void beginTransaction(String transactionId);
 
     /**
      * Prepare a transaction commit.
@@ -50,15 +50,15 @@ public interface KafkaProduceSender<K, V> extends AutoCloseable {
     /**
      * Abort the given transaction.
      *
-     * @param kafkaStates kafka states about the transaction info.
+     * @param checkpointId the id of the last checkpoint of the last run
      */
-    void abortTransaction(List<KafkaSinkState> kafkaStates);
+    void abortTransaction(long checkpointId);
 
     /**
      * Get the current kafka state of the sender.
      *
      * @return kafka state List, or empty if no state is available.
      */
-    List<KafkaSinkState> snapshotState();
+    List<KafkaSinkState> snapshotState(long checkpointId);
 
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
index 5d70ff8e..aafa0bd4 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkCommitter.java
@@ -36,6 +36,8 @@ public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
 
     private final Config pluginConfig;
 
+    private KafkaInternalProducer<?, ?> kafkaProducer;
+
     public KafkaSinkCommitter(Config pluginConfig) {
         this.pluginConfig = pluginConfig;
     }
@@ -52,6 +54,7 @@ public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
             }
             KafkaProducer<?, ?> producer = getProducer(commitInfo);
             producer.commitTransaction();
+            producer.flush();
         }
         return commitInfos;
     }
@@ -67,11 +70,16 @@ public class KafkaSinkCommitter implements SinkCommitter<KafkaCommitInfo> {
         }
     }
 
-    private KafkaProducer<?, ?> getProducer(KafkaCommitInfo kafkaCommitInfo) {
-        Properties kafkaProperties = kafkaCommitInfo.getKafkaProperties();
-        kafkaProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, kafkaCommitInfo.getTransactionId());
-        KafkaProducer<?, ?> kafkaProducer = new KafkaProducer<>(kafkaCommitInfo.getKafkaProperties());
-        kafkaProducer.initTransactions();
+    private KafkaInternalProducer<?, ?> getProducer(KafkaCommitInfo commitInfo) {
+        if (this.kafkaProducer != null) {
+            this.kafkaProducer.setTransactionalId(commitInfo.getTransactionId());
+        } else {
+            Properties kafkaProperties = commitInfo.getKafkaProperties();
+            kafkaProperties.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG, commitInfo.getTransactionId());
+            kafkaProducer =
+                    new KafkaInternalProducer<>(commitInfo.getKafkaProperties(), commitInfo.getTransactionId());
+        }
+        kafkaProducer.resumeTransaction(commitInfo.getProducerId(), commitInfo.getEpoch());
         return kafkaProducer;
     }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
index 697da6a4..e4045116 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkWriter.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
+import static org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.TRANSACTION_PREFIX;
+
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
@@ -36,6 +38,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import java.util.List;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Random;
 
 /**
  * KafkaSinkWriter is a sink writer that will write {@link SeaTunnelRow} to Kafka.
@@ -43,10 +46,14 @@ import java.util.Properties;
 public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo, KafkaSinkState> {
 
     private final SinkWriter.Context context;
-    private SeaTunnelRowTypeInfo seaTunnelRowTypeInfo;
     private final Config pluginConfig;
 
-    private KafkaProduceSender<String, String> kafkaProducerSender;
+    private String transactionPrefix;
+    private long lastCheckpointId = 0;
+
+    private final KafkaProduceSender<String, String> kafkaProducerSender;
+
+    private static final int PREFIX_RANGE = 10000;
 
     // check config
     @Override
@@ -55,22 +62,33 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
         kafkaProducerSender.send(producerRecord);
     }
 
-    private SeaTunnelRowSerializer<String, String> seaTunnelRowSerializer;
+    private final SeaTunnelRowSerializer<String, String> seaTunnelRowSerializer;
 
     public KafkaSinkWriter(
-        SinkWriter.Context context,
-        SeaTunnelRowTypeInfo seaTunnelRowTypeInfo,
-        Config pluginConfig,
-        List<KafkaSinkState> kafkaStates) {
+            SinkWriter.Context context,
+            SeaTunnelRowTypeInfo seaTunnelRowTypeInfo,
+            Config pluginConfig,
+            List<KafkaSinkState> kafkaStates) {
         this.context = context;
-        this.seaTunnelRowTypeInfo = seaTunnelRowTypeInfo;
         this.pluginConfig = pluginConfig;
+        if (pluginConfig.hasPath(TRANSACTION_PREFIX)) {
+            this.transactionPrefix = pluginConfig.getString(TRANSACTION_PREFIX);
+        } else {
+            Random random = new Random();
+            this.transactionPrefix = String.format("SeaTunnel%04d", random.nextInt(PREFIX_RANGE));
+        }
+        restoreState(kafkaStates);
         this.seaTunnelRowSerializer = getSerializer(pluginConfig, seaTunnelRowTypeInfo);
         if (KafkaSemantics.EXACTLY_ONCE.equals(getKafkaSemantics(pluginConfig))) {
-            // the recover state
-            this.kafkaProducerSender = new KafkaTransactionSender<>(getKafkaProperties(pluginConfig));
-            this.kafkaProducerSender.abortTransaction(kafkaStates);
-            this.kafkaProducerSender.beginTransaction();
+            this.kafkaProducerSender =
+                    new KafkaTransactionSender<>(this.transactionPrefix, getKafkaProperties(pluginConfig));
+            // abort all transaction number bigger than current transaction, because they maybe already start
+            //  transaction.
+            if (!kafkaStates.isEmpty()) {
+                this.kafkaProducerSender.abortTransaction(kafkaStates.get(0).getCheckpointId() + 1);
+            }
+            this.kafkaProducerSender.beginTransaction(generateTransactionId(this.transactionPrefix,
+                    this.lastCheckpointId + 1));
         } else {
             this.kafkaProducerSender = new KafkaNoTransactionSender<>(getKafkaProperties(pluginConfig));
         }
@@ -78,7 +96,11 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
 
     @Override
     public List<KafkaSinkState> snapshotState(long checkpointId) {
-        return kafkaProducerSender.snapshotState();
+        List<KafkaSinkState> states = kafkaProducerSender.snapshotState(checkpointId);
+        this.lastCheckpointId = checkpointId;
+        this.kafkaProducerSender.beginTransaction(generateTransactionId(this.transactionPrefix,
+                this.lastCheckpointId + 1));
+        return states;
     }
 
     @Override
@@ -102,7 +124,7 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
 
     private Properties getKafkaProperties(Config pluginConfig) {
         Config kafkaConfig = TypesafeConfigUtils.extractSubConfig(pluginConfig,
-            org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX, true);
+                org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KAFKA_CONFIG_PREFIX, true);
         Properties kafkaProperties = new Properties();
         kafkaConfig.entrySet().forEach(entry -> {
             kafkaProperties.put(entry.getKey(), entry.getValue().unwrapped());
@@ -124,4 +146,16 @@ public class KafkaSinkWriter implements SinkWriter<SeaTunnelRow, KafkaCommitInfo
         }
         return KafkaSemantics.NON;
     }
+
+    protected static String generateTransactionId(String transactionPrefix, long checkpointId) {
+        return transactionPrefix + "-" + checkpointId;
+    }
+
+    private void restoreState(List<KafkaSinkState> states) {
+        if (!states.isEmpty()) {
+            this.transactionPrefix = states.get(0).getTransactionIdPrefix();
+            this.lastCheckpointId = states.get(0).getCheckpointId();
+        }
+    }
+
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
index df4fc5b0..ee8762ac 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaTransactionSender.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
 
+import static org.apache.seatunnel.connectors.seatunnel.kafka.sink.KafkaSinkWriter.generateTransactionId;
+
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.kafka.state.KafkaSinkState;
 
@@ -41,14 +43,14 @@ public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTransactionSender.class);
 
-    private final KafkaProducer<K, V> kafkaProducer;
-    private final String transactionId;
+    private KafkaInternalProducer<K, V> kafkaProducer;
+    private String transactionId;
+    private final String transactionPrefix;
     private final Properties kafkaProperties;
 
-    public KafkaTransactionSender(Properties kafkaProperties) {
+    public KafkaTransactionSender(String transactionPrefix, Properties kafkaProperties) {
+        this.transactionPrefix = transactionPrefix;
         this.kafkaProperties = kafkaProperties;
-        this.transactionId = getTransactionId();
-        this.kafkaProducer = getTransactionProducer(kafkaProperties, transactionId);
     }
 
     @Override
@@ -57,15 +59,16 @@ public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
     }
 
     @Override
-    public void beginTransaction() {
+    public void beginTransaction(String transactionId) {
+        this.transactionId = transactionId;
+        this.kafkaProducer = getTransactionProducer(kafkaProperties, transactionId);
         kafkaProducer.beginTransaction();
     }
 
     @Override
     public Optional<KafkaCommitInfo> prepareCommit() {
-        // TODO kafka can't use transactionId to commit on different producer directly, we should find
-        //  another way
-        KafkaCommitInfo kafkaCommitInfo = new KafkaCommitInfo(transactionId, kafkaProperties);
+        KafkaCommitInfo kafkaCommitInfo = new KafkaCommitInfo(transactionId, kafkaProperties,
+                this.kafkaProducer.getProducerId(), this.kafkaProducer.getEpoch());
         return Optional.of(kafkaCommitInfo);
     }
 
@@ -75,24 +78,33 @@ public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
     }
 
     @Override
-    public void abortTransaction(List<KafkaSinkState> kafkaStates) {
-        if (kafkaStates.isEmpty()) {
-            return;
+    public void abortTransaction(long checkpointId) {
+
+        KafkaInternalProducer<K, V> producer;
+        if (this.kafkaProducer != null) {
+            producer = this.kafkaProducer;
+        } else {
+            producer = getTransactionProducer(this.kafkaProperties,
+                    generateTransactionId(this.transactionPrefix, checkpointId));
         }
-        for (KafkaSinkState kafkaState : kafkaStates) {
-            // create the transaction producer
+
+        for (long i = checkpointId; ; i++) {
+            String transactionId = generateTransactionId(this.transactionPrefix, i);
+            producer.setTransactionalId(transactionId);
             if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug("Abort kafka transaction: {}", kafkaState.getTransactionId());
+                LOGGER.debug("Abort kafka transaction: {}", transactionId);
+            }
+            producer.flush();
+            if (producer.getEpoch() == 0) {
+                break;
             }
-            KafkaProducer<K, V> historyProducer = getTransactionProducer(kafkaProperties, kafkaState.getTransactionId());
-            historyProducer.abortTransaction();
-            historyProducer.close();
         }
     }
 
     @Override
-    public List<KafkaSinkState> snapshotState() {
-        return Lists.newArrayList(new KafkaSinkState(transactionId, kafkaProperties));
+    public List<KafkaSinkState> snapshotState(long checkpointId) {
+        return Lists.newArrayList(new KafkaSinkState(transactionId, transactionPrefix, checkpointId,
+                kafkaProperties));
     }
 
     @Override
@@ -103,16 +115,12 @@ public class KafkaTransactionSender<K, V> implements KafkaProduceSender<K, V> {
         }
     }
 
-    private KafkaProducer<K, V> getTransactionProducer(Properties properties, String transactionId) {
+    private KafkaInternalProducer<K, V> getTransactionProducer(Properties properties, String transactionId) {
         Properties transactionProperties = (Properties) properties.clone();
         transactionProperties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
-        KafkaProducer<K, V> transactionProducer = new KafkaProducer<>(transactionProperties);
+        KafkaInternalProducer<K, V> transactionProducer = new KafkaInternalProducer<>(transactionProperties, transactionId);
         transactionProducer.initTransactions();
         return transactionProducer;
     }
 
-    // todo: use a better way to generate the transaction id
-    private String getTransactionId() {
-        return "SeaTunnel-" + System.currentTimeMillis();
-    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index fe652d25..90bdc2b7 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -135,7 +135,7 @@ public class KafkaSourceSplitEnumerator implements SourceSplitEnumerator<KafkaSo
             topics = Arrays.asList(this.metadata.getTopic().split(","));
         }
         Collection<TopicPartition> partitions =
-                adminClient.describeTopics(topics).allTopicNames().get().values().stream().flatMap(t -> t.partitions().stream()
+                adminClient.describeTopics(topics).all().get().values().stream().flatMap(t -> t.partitions().stream()
                         .map(p -> new TopicPartition(t.name(), p.partition()))).collect(Collectors.toSet());
         return adminClient.listOffsets(partitions.stream().collect(Collectors.toMap(p -> p, p -> OffsetSpec.latest())))
                 .all().get().entrySet().stream().map(partition -> {
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
index b1a032f7..20dd5ae9 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaCommitInfo.java
@@ -29,5 +29,7 @@ public class KafkaCommitInfo implements Serializable {
 
     private final String transactionId;
     private final Properties kafkaProperties;
+    private final long producerId;
+    private final short epoch;
 
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSinkState.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSinkState.java
index 311d1e29..99f8cef8 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSinkState.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connector-seatunnel-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/state/KafkaSinkState.java
@@ -28,6 +28,8 @@ import java.util.Properties;
 public class KafkaSinkState implements Serializable {
 
     private final String transactionId;
+    private final String transactionIdPrefix;
+    private final long checkpointId;
     private final Properties kafkaProperties;
 
 }