You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/12 01:50:29 UTC

[inlong] branch master updated: [INLONG-7783][Agent] Support sink data to Kafka (#7808)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 030c81822 [INLONG-7783][Agent] Support sink data to Kafka (#7808)
030c81822 is described below

commit 030c8182279df80412c630f37019984dae52182d
Author: wangpeix <10...@users.noreply.github.com>
AuthorDate: Wed Apr 12 09:50:23 2023 +0800

    [INLONG-7783][Agent] Support sink data to Kafka (#7808)
    
    Co-authored-by: wangpeix <lu...@didiglobal.com>
---
 .../inlong/agent/constant/AgentConstants.java      |  14 +
 .../apache/inlong/agent/pojo/JobProfileDto.java    |   3 +
 .../inlong/agent/plugin/sinks/KafkaSink.java       | 442 +++++++++++++++++++++
 .../inlong/agent/plugin/sinks/ProxySink.java       |   4 +-
 .../inlong/agent/plugin/sinks/KafkaSinkTest.java   |  68 ++++
 .../src/test/resources/kafkaSinkJob.json           |  15 +
 .../common/pojo/dataproxy/MQClusterInfo.java       |   2 +
 .../manager/common/consts/InlongConstants.java     |   2 +
 .../service/core/impl/AgentServiceImpl.java        |  10 +-
 9 files changed, 555 insertions(+), 5 deletions(-)

diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index eceb0dbf3..8baa32bc2 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -171,12 +171,26 @@ public class AgentConstants {
     public static final String DEFAULT_COMPRESSION_TYPE = "NONE";
 
     public static final String PULSAR_CLIENT_PRODUCER_NUM = "agent.sink.pulsar.producer.num";
+    public static final String KAFKA_SINK_PRODUCER_NUM = "agent.sink.kafka.producer.num";
     public static final int DEFAULT_PRODUCER_NUM = 3;
 
     public static final String PULSAR_CLIENT_ENABLE_ASYNC_SEND = "agent.sink.pulsar.enbale.async.send";
+    public static final String KAFKA_PRODUCER_ENABLE_ASYNC_SEND = "agent.sink.kafka.enbale.async.send";
     public static final boolean DEFAULT_ENABLE_ASYNC_SEND = true;
 
     public static final String PULSAR_SINK_SEND_QUEUE_SIZE = "agent.sink.pulsar.send.queue.size";
+    public static final String KAFKA_SINK_SEND_QUEUE_SIZE = "agent.sink.kafka.send.queue.size";
     public static final int DEFAULT_SEND_QUEUE_SIZE = 20000;
 
+    public static final String DEFAULT_KAFKA_SINK_SEND_ACKS = "1";
+    public static final long DEFAULT_KAFKA_SINK_SYNC_SEND_TIMEOUT_MS = 3000;
+
+    public static final String DEFAULT_KAFKA_SINK_SEND_COMPRESSION_TYPE = "none";
+
+    public static final String DEFAULT_KAFKA_SINK_SEND_KEY_SERIALIZER =
+            "org.apache.kafka.common.serialization.StringSerializer";
+
+    public static final String DEFAULT_KAFKA_SINK_SEND_VALUE_SERIALIZER =
+            "org.apache.kafka.common.serialization.ByteArraySerializer";
+
 }
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 57cae7730..341ef2391 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -39,6 +39,7 @@ public class JobProfileDto {
     public static final String MANAGER_JOB = "MANAGER_JOB";
     public static final String DEFAULT_DATAPROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink";
     public static final String PULSAR_SINK = "org.apache.inlong.agent.plugin.sinks.PulsarSink";
+    public static final String KAFKA_SINK = "org.apache.inlong.agent.plugin.sinks.KafkaSink";
 
     /**
      * file source
@@ -412,6 +413,8 @@ public class JobProfileDto {
             job.setTopicInfo(GSON.toJson(dataConfig.getTopicInfo()));
             if (mqType.equals(MQType.PULSAR)) {
                 job.setSink(PULSAR_SINK);
+            } else if (mqType.equals(MQType.KAFKA)) {
+                job.setSink(KAFKA_SINK);
             } else {
                 throw new IllegalArgumentException("input dataConfig" + dataConfig + "is invalid please check");
             }
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
new file mode 100644
index 000000000..f9070e5fc
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.message.BatchProxyMessage;
+import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.message.PackProxyMessage;
+import org.apache.inlong.agent.message.ProxyMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_ENABLE_ASYNC_SEND;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_KAFKA_SINK_SEND_ACKS;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_KAFKA_SINK_SEND_COMPRESSION_TYPE;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_KAFKA_SINK_SEND_KEY_SERIALIZER;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_KAFKA_SINK_SEND_VALUE_SERIALIZER;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_KAFKA_SINK_SYNC_SEND_TIMEOUT_MS;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PRODUCER_NUM;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_SEND_QUEUE_SIZE;
+import static org.apache.inlong.agent.constant.AgentConstants.KAFKA_PRODUCER_ENABLE_ASYNC_SEND;
+import static org.apache.inlong.agent.constant.AgentConstants.KAFKA_SINK_PRODUCER_NUM;
+import static org.apache.inlong.agent.constant.AgentConstants.KAFKA_SINK_SEND_QUEUE_SIZE;
+import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
+
+public class KafkaSink extends AbstractSink {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
+    private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+            60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new AgentThreadFactory("KafkaSink"));
+    private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
+    private TaskPositionManager taskPositionManager;
+    private volatile boolean shutdown = false;
+
+    private List<MQClusterInfo> mqClusterInfos;
+    private String topic;
+    private List<KafkaSender> kafkaSenders;
+    private static final AtomicInteger KAFKA_SENDER_INDEX = new AtomicInteger(0);
+
+    private LinkedBlockingQueue<BatchProxyMessage> kafkaSendQueue;
+
+    private int producerNum;
+    private boolean asyncSend;
+
+    @Override
+    public void init(JobProfile jobConf) {
+        super.init(jobConf);
+        taskPositionManager = TaskPositionManager.getInstance();
+        int sendQueueSize = agentConf.getInt(KAFKA_SINK_SEND_QUEUE_SIZE, DEFAULT_SEND_QUEUE_SIZE);
+        kafkaSendQueue = new LinkedBlockingQueue<>(sendQueueSize);
+        producerNum = agentConf.getInt(KAFKA_SINK_PRODUCER_NUM, DEFAULT_PRODUCER_NUM);
+        asyncSend = agentConf.getBoolean(KAFKA_PRODUCER_ENABLE_ASYNC_SEND, DEFAULT_ENABLE_ASYNC_SEND);
+
+        mqClusterInfos = jobConf.getMqClusters();
+        Preconditions.checkArgument(ObjectUtils.isNotEmpty(jobConf.getMqTopic()) && jobConf.getMqTopic().isValid(),
+                "no valid kafka topic config");
+        topic = jobConf.getMqTopic().getTopic();
+
+        kafkaSenders = new ArrayList<>();
+        initKafkaSender();
+        EXECUTOR_SERVICE.execute(sendDataThread());
+        EXECUTOR_SERVICE.execute(flushCache());
+    }
+
+    @Override
+    public void write(Message message) {
+        if (message == null || message instanceof EndMessage) {
+            return;
+        }
+
+        try {
+            ProxyMessage proxyMessage = new ProxyMessage(message);
+            // add proxy message to cache.
+            cache.compute(proxyMessage.getBatchKey(),
+                    (s, packProxyMessage) -> {
+                        if (packProxyMessage == null) {
+                            packProxyMessage =
+                                    new PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, inlongStreamId);
+                            packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
+                            packProxyMessage.addTopicAndDataTime(topic, System.currentTimeMillis());
+                        }
+                        // add message to package proxy
+                        packProxyMessage.addProxyMessage(proxyMessage);
+                        return packProxyMessage;
+                    });
+            // increment the count of successful sinks
+            sinkMetric.sinkSuccessCount.incrementAndGet();
+        } catch (Exception e) {
+            sinkMetric.sinkFailCount.incrementAndGet();
+            LOGGER.error("write job[{}] data to cache error", jobInstanceId, e);
+        } catch (Throwable t) {
+            ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
+        }
+    }
+
+    @Override
+    public void destroy() {
+        LOGGER.info("destroy job[{}] kafka sink", jobInstanceId);
+        while (!sinkFinish()) {
+            LOGGER.info("job[{}] wait until cache all data to kafka", jobInstanceId);
+            AgentUtils.silenceSleepInMs(batchFlushInterval);
+        }
+        shutdown = true;
+        if (CollectionUtils.isNotEmpty(kafkaSenders)) {
+            for (KafkaSender sender : kafkaSenders) {
+                sender.close();
+            }
+            kafkaSenders.clear();
+        }
+    }
+
+    private boolean sinkFinish() {
+        return cache.values().stream().allMatch(PackProxyMessage::isEmpty) && kafkaSendQueue.isEmpty();
+    }
+
+    /**
+     * flush cache by batch
+     *
+     * @return thread runner
+     */
+    private Runnable flushCache() {
+        return () -> {
+            LOGGER.info("start kafka sink flush cache thread, job[{}], groupId[{}]", jobInstanceId, inlongGroupId);
+            while (!shutdown) {
+                try {
+                    cache.forEach((batchKey, packProxyMessage) -> {
+                        BatchProxyMessage batchProxyMessage = packProxyMessage.fetchBatch();
+                        if (batchProxyMessage == null) {
+                            return;
+                        }
+                        try {
+                            kafkaSendQueue.put(batchProxyMessage);
+                            if (LOGGER.isDebugEnabled()) {
+                                LOGGER.debug(
+                                        "send group id {}, message key {},with message size {}, the job id is {}, "
+                                                + "read source is {} sendTime is {}",
+                                        inlongGroupId, batchKey,
+                                        batchProxyMessage.getDataList().size(), jobInstanceId, sourceName,
+                                        batchProxyMessage.getDataTime());
+                            }
+                        } catch (Exception e) {
+                            LOGGER.error("flush job[{}] data to send queue exception", jobInstanceId, e);
+                        }
+                    });
+                    AgentUtils.silenceSleepInMs(batchFlushInterval);
+                } catch (Exception ex) {
+                    LOGGER.error("error caught", ex);
+                } catch (Throwable t) {
+                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
+                }
+            }
+        };
+    }
+
+    /**
+     * take batchMsg from sendQueue and send to kafka
+     */
+    private Runnable sendDataThread() {
+        return () -> {
+            LOGGER.info("start kafka sink send data thread, job[{}], groupId[{}]", jobInstanceId, inlongGroupId);
+            while (!shutdown) {
+                try {
+                    BatchProxyMessage data = kafkaSendQueue.poll(1, TimeUnit.MILLISECONDS);
+                    if (ObjectUtils.isEmpty(data)) {
+                        continue;
+                    }
+                    sendData(data);
+                } catch (Throwable t) {
+                    LOGGER.error("send job[{}] data to kafka error", jobInstanceId, t);
+                }
+            }
+        };
+    }
+
+    private void sendData(BatchProxyMessage batchMsg) throws InterruptedException {
+        if (ObjectUtils.isEmpty(batchMsg)) {
+            return;
+        }
+
+        KafkaProducer<String, byte[]> producer = selectProducer();
+        if (ObjectUtils.isEmpty(producer)) {
+            kafkaSendQueue.put(batchMsg);
+            LOGGER.error("send job[{}] data err, empty kafka producer", jobInstanceId);
+            return;
+        }
+
+        ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, batchMsg.getInLongMsg().buildArray());
+        sinkMetric.pluginSendCount.addAndGet(batchMsg.getMsgCnt());
+        if (asyncSend) {
+            producer.send(record, new AsyncSinkCallback(System.currentTimeMillis(), batchMsg));
+        } else {
+            try {
+                Future<RecordMetadata> future = producer.send(record);
+                future.get(DEFAULT_KAFKA_SINK_SYNC_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+                updateSuccessSendMetrics(batchMsg);
+            } catch (Exception e) {
+                sinkMetric.pluginSendFailCount.addAndGet(batchMsg.getMsgCnt());
+                LOGGER.error("send job[{}] data fail to kafka, add back to send queue, send queue size {}",
+                        jobInstanceId,
+                        kafkaSendQueue.size(), e);
+                kafkaSendQueue.put(batchMsg);
+            }
+        }
+    }
+
+    private void updateSuccessSendMetrics(BatchProxyMessage batchMsg) {
+        AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, batchMsg.getGroupId(),
+                batchMsg.getStreamId(), batchMsg.getDataTime(), batchMsg.getMsgCnt(),
+                batchMsg.getTotalSize());
+        sinkMetric.pluginSendSuccessCount.addAndGet(batchMsg.getMsgCnt());
+        if (sourceName != null) {
+            taskPositionManager.updateSinkPosition(batchMsg, sourceName, batchMsg.getMsgCnt());
+        }
+    }
+
+    private KafkaProducer<String, byte[]> selectProducer() {
+        if (CollectionUtils.isEmpty(kafkaSenders)) {
+            LOGGER.error("send job[{}] data err, empty kafka sender", jobInstanceId);
+            return null;
+        }
+        KafkaSender sender = kafkaSenders.get(
+                (KAFKA_SENDER_INDEX.getAndIncrement() & Integer.MAX_VALUE) % kafkaSenders.size());
+        return sender.getProducer();
+    }
+
+    private void initKafkaSender() {
+        if (CollectionUtils.isEmpty(mqClusterInfos)) {
+            LOGGER.error("init job[{}] kafka producer fail, empty mqCluster info", jobInstanceId);
+            return;
+        }
+        for (MQClusterInfo clusterInfo : mqClusterInfos) {
+            if (!clusterInfo.isValid()) {
+                continue;
+            }
+            kafkaSenders.add(new KafkaSender(clusterInfo, producerNum));
+        }
+    }
+
+    class KafkaSender {
+
+        private final Properties kafkaProps = new Properties();
+        private List<KafkaProducer<String, byte[]>> producers;
+        private final AtomicInteger producerIndex = new AtomicInteger(0);
+
+        public KafkaSender(MQClusterInfo clusterInfo, int producerNum) {
+            setKafkaProps(clusterInfo);
+            initKafkaProducer(kafkaProps, producerNum);
+        }
+
+        private void setKafkaProps(MQClusterInfo clusterInfo) {
+            kafkaProps.clear();
+            Map<String, String> params = clusterInfo.getParams();
+
+            // set bootstrap server
+            String bootStrapServers = params.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
+            if (bootStrapServers == null) {
+                throw new IllegalArgumentException("kafka param bootstrap.servers is null");
+            }
+            kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+
+            // set ack
+            String acks = params.get(ProducerConfig.ACKS_CONFIG);
+            if (StringUtils.isNotEmpty(acks)) {
+                kafkaProps.put(ProducerConfig.ACKS_CONFIG, acks);
+            } else {
+                kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_KAFKA_SINK_SEND_ACKS);
+            }
+
+            // set compression
+            String compressionType = params.get(ProducerConfig.COMPRESSION_TYPE_CONFIG);
+            if (StringUtils.isNotEmpty(compressionType)) {
+                kafkaProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
+            } else {
+                kafkaProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, DEFAULT_KAFKA_SINK_SEND_COMPRESSION_TYPE);
+            }
+
+            // set serializer
+            String keySerializer = params.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+            if (StringUtils.isNotEmpty(keySerializer)) {
+                kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
+            } else {
+                kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KAFKA_SINK_SEND_KEY_SERIALIZER);
+            }
+
+            String valueSerializer = params.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+            if (StringUtils.isNotEmpty(keySerializer)) {
+                kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
+            } else {
+                kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_KAFKA_SINK_SEND_VALUE_SERIALIZER);
+            }
+
+            // set linger
+            String lingerMs = params.get(ProducerConfig.LINGER_MS_CONFIG);
+            if (StringUtils.isNotEmpty(lingerMs)) {
+                kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(lingerMs));
+            }
+
+            // set batch size
+            String batchSize = params.get(ProducerConfig.BATCH_SIZE_CONFIG);
+            if (StringUtils.isNotEmpty(batchSize)) {
+                kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(batchSize));
+            }
+
+            // set buffer memory
+            String bufferMemory = params.get(ProducerConfig.BUFFER_MEMORY_CONFIG);
+            if (StringUtils.isNotEmpty(batchSize)) {
+                kafkaProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.parseInt(bufferMemory));
+            }
+
+            // set authentication
+            String securityProtocol = params.get(SECURITY_PROTOCOL_CONFIG);
+            if (StringUtils.isNotEmpty(securityProtocol)) {
+                kafkaProps.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
+            }
+
+            String saslMechanism = params.get(SASL_MECHANISM);
+            if (StringUtils.isNotEmpty(saslMechanism)) {
+                kafkaProps.put(SASL_MECHANISM, saslMechanism);
+            }
+
+            String saslJaasConfig = params.get(SASL_JAAS_CONFIG);
+            if (StringUtils.isNotEmpty(saslJaasConfig)) {
+                kafkaProps.put(SASL_JAAS_CONFIG, saslJaasConfig);
+            }
+        }
+
+        private void initKafkaProducer(Properties kafkaProps, int producerNum) {
+            producers = new ArrayList<>(producerNum);
+            for (int i = 0; i < producerNum; i++) {
+                producers.add(new KafkaProducer<>(kafkaProps));
+            }
+        }
+
+        public KafkaProducer<String, byte[]> getProducer() {
+            if (CollectionUtils.isEmpty(producers)) {
+                LOGGER.error("job[{}] empty producers", jobInstanceId);
+                return null;
+            }
+            int index = (producerIndex.getAndIncrement() & Integer.MAX_VALUE) % producers.size();
+            return producers.get(index);
+        }
+
+        /**
+         * close all kafka producer
+         */
+        public void close() {
+            if (CollectionUtils.isEmpty(producers)) {
+                return;
+            }
+
+            for (KafkaProducer<String, byte[]> producer : producers) {
+                producer.close();
+            }
+        }
+    }
+
+    class AsyncSinkCallback implements Callback {
+
+        private long startTime;
+        private BatchProxyMessage batchMsg;
+
+        public AsyncSinkCallback(long startTime, BatchProxyMessage batchMsg) {
+            this.startTime = startTime;
+            this.batchMsg = batchMsg;
+        }
+
+        @Override
+        public void onCompletion(RecordMetadata metadata, Exception exception) {
+            if (exception != null) {
+                sinkMetric.pluginSendFailCount.addAndGet(batchMsg.getMsgCnt());
+                LOGGER.error("send job[{}] data fail to kafka, will add back to sendqueue, current sendqueue size {}",
+                        jobInstanceId,
+                        kafkaSendQueue.size(), exception);
+                try {
+                    kafkaSendQueue.put(batchMsg);
+                } catch (InterruptedException ex) {
+                    LOGGER.error("put job[{}] data back to queue fail, send queue size {}", jobInstanceId,
+                            kafkaSendQueue.size(), ex);
+                }
+            } else {
+                updateSuccessSendMetrics(batchMsg);
+            }
+
+            if (LOGGER.isDebugEnabled()) {
+                long elapsedTime = System.currentTimeMillis() - startTime;
+                if (metadata != null) {
+                    LOGGER.debug("acked job[{}] message partition:{} ofset:{}", jobInstanceId, metadata.partition(),
+                            metadata.offset());
+                }
+                LOGGER.debug("job[{}] send data to kafka elapsed time: {}", jobInstanceId, elapsedTime);
+            }
+        }
+    }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 5050e39dd..090ab2c33 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -82,12 +82,10 @@ public class ProxySink extends AbstractSink {
                             });
                     // increment the count of successful sinks
                     sinkMetric.sinkSuccessCount.incrementAndGet();
-                } else {
-                    // increment the count of failed sinks
-                    sinkMetric.sinkFailCount.incrementAndGet();
                 }
             }
         } catch (Exception e) {
+            sinkMetric.sinkFailCount.incrementAndGet();
             LOGGER.error("write message to Proxy sink error", e);
         } catch (Throwable t) {
             ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
new file mode 100644
index 000000000..83030dc81
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.message.ProxyMessage;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.MiniAgent;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
+import static org.junit.Assert.assertEquals;
+
+public class KafkaSinkTest {
+
+    private static MockSink kafkaSink;
+    private static JobProfile jobProfile;
+    private static AgentBaseTestsHelper helper;
+    private static MiniAgent agent;
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        helper = new AgentBaseTestsHelper(KafkaSinkTest.class.getName()).setupAgentHome();
+        agent = new MiniAgent();
+        jobProfile = JobProfile.parseJsonFile("kafkaSinkJob.json");
+        jobProfile.set("job.mqClusters",
+                "[{\"url\":\"mqurl\",\"token\":\"token\",\"mqType\":\"KAFKA\",\"params\":{}}]");
+        jobProfile.set("job.topicInfo", "{\"topic\":\"topic\",\"inlongGroupId\":\"groupId\"}");
+        System.out.println(jobProfile.toJsonStr());
+        kafkaSink = new MockSink();
+        kafkaSink.init(jobProfile);
+    }
+
+    @Test
+    public void testWrite() {
+        String body = "testMesage";
+        Map<String, String> attr = new HashMap<>();
+        attr.put(PROXY_KEY_GROUP_ID, "groupId");
+        attr.put(PROXY_KEY_STREAM_ID, "streamId");
+        long count = 5;
+        for (long i = 0; i < 5; i++) {
+            kafkaSink.write(new ProxyMessage(body.getBytes(StandardCharsets.UTF_8), attr));
+        }
+        assertEquals(kafkaSink.sinkMetric.sinkSuccessCount.get(), count);
+    }
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/resources/kafkaSinkJob.json b/inlong-agent/agent-plugins/src/test/resources/kafkaSinkJob.json
new file mode 100644
index 000000000..4f5110439
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/resources/kafkaSinkJob.json
@@ -0,0 +1,15 @@
+{
+  "job": {
+    "id": 1,
+    "instance.id": "job_1",
+    "source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
+    "sink": "org.apache.inlong.agent.plugin.sinks.KafkaSink",
+    "channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
+    "groupId": "groupId",
+    "streamId": "streamId"
+  },
+  "proxy": {
+    "inlongGroupId": "groupId",
+    "inlongStreamId": "streamId"
+  }
+}
\ No newline at end of file
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
index 4b94d8391..114c7db62 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
@@ -17,6 +17,7 @@
 
 package org.apache.inlong.common.pojo.dataproxy;
 
+import lombok.ToString;
 import org.apache.commons.lang3.StringUtils;
 
 import java.util.HashMap;
@@ -25,6 +26,7 @@ import java.util.Map;
 /**
  * MQ cluster info.
  */
+@ToString
 public class MQClusterInfo {
 
     private String url;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 187b92bff..f83a559fc 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -40,6 +40,8 @@ public class InlongConstants {
      */
     public static final String COMMA = ",";
 
+    public static final String DOT = ".";
+
     public static final String BLANK = " ";
 
     public static final String SLASH = "/";
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index d644bb8d8..b3a2d7e92 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -84,6 +84,8 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static org.apache.inlong.manager.common.consts.InlongConstants.DOT;
+
 /**
  * Agent service layer implementation
  */
@@ -476,9 +478,8 @@ public class AgentServiceImpl implements AgentService {
                 // add mq cluster setting
                 List<MQClusterInfo> mqSet = new ArrayList<>();
                 List<String> clusterTagList = Collections.singletonList(groupEntity.getInlongClusterTag());
-                List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, ClusterType.PULSAR);
                 ClusterPageRequest pageRequest = ClusterPageRequest.builder()
-                        .typeList(typeList)
+                        .type(groupEntity.getMqType())
                         .clusterTagList(clusterTagList)
                         .build();
                 List<InlongClusterEntity> mqClusterList = clusterMapper.selectByCondition(pageRequest);
@@ -518,6 +519,11 @@ public class AgentServiceImpl implements AgentService {
                     topicConfig.setInlongGroupId(groupId);
                     topicConfig.setTopic(mqResource);
                     dataConfig.setTopicInfo(topicConfig);
+                } else if (MQType.KAFKA.equals(mqType)) {
+                    DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+                    topicConfig.setInlongGroupId(groupId);
+                    topicConfig.setTopic(groupEntity.getMqResource() + DOT + streamEntity.getMqResource());
+                    dataConfig.setTopicInfo(topicConfig);
                 }
             } else {
                 LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", groupId, streamId);