You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2023/04/12 01:50:29 UTC
[inlong] branch master updated: [INLONG-7783][Agent] Support sink data to Kafka (#7808)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 030c81822 [INLONG-7783][Agent] Support sink data to Kafka (#7808)
030c81822 is described below
commit 030c8182279df80412c630f37019984dae52182d
Author: wangpeix <10...@users.noreply.github.com>
AuthorDate: Wed Apr 12 09:50:23 2023 +0800
[INLONG-7783][Agent] Support sink data to Kafka (#7808)
Co-authored-by: wangpeix <lu...@didiglobal.com>
---
.../inlong/agent/constant/AgentConstants.java | 14 +
.../apache/inlong/agent/pojo/JobProfileDto.java | 3 +
.../inlong/agent/plugin/sinks/KafkaSink.java | 442 +++++++++++++++++++++
.../inlong/agent/plugin/sinks/ProxySink.java | 4 +-
.../inlong/agent/plugin/sinks/KafkaSinkTest.java | 68 ++++
.../src/test/resources/kafkaSinkJob.json | 15 +
.../common/pojo/dataproxy/MQClusterInfo.java | 2 +
.../manager/common/consts/InlongConstants.java | 2 +
.../service/core/impl/AgentServiceImpl.java | 10 +-
9 files changed, 555 insertions(+), 5 deletions(-)
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
index eceb0dbf3..8baa32bc2 100755
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/AgentConstants.java
@@ -171,12 +171,26 @@ public class AgentConstants {
public static final String DEFAULT_COMPRESSION_TYPE = "NONE";
public static final String PULSAR_CLIENT_PRODUCER_NUM = "agent.sink.pulsar.producer.num";
+ public static final String KAFKA_SINK_PRODUCER_NUM = "agent.sink.kafka.producer.num";
public static final int DEFAULT_PRODUCER_NUM = 3;
public static final String PULSAR_CLIENT_ENABLE_ASYNC_SEND = "agent.sink.pulsar.enbale.async.send";
+ public static final String KAFKA_PRODUCER_ENABLE_ASYNC_SEND = "agent.sink.kafka.enbale.async.send";
public static final boolean DEFAULT_ENABLE_ASYNC_SEND = true;
public static final String PULSAR_SINK_SEND_QUEUE_SIZE = "agent.sink.pulsar.send.queue.size";
+ public static final String KAFKA_SINK_SEND_QUEUE_SIZE = "agent.sink.kafka.send.queue.size";
public static final int DEFAULT_SEND_QUEUE_SIZE = 20000;
+ public static final String DEFAULT_KAFKA_SINK_SEND_ACKS = "1";
+ public static final long DEFAULT_KAFKA_SINK_SYNC_SEND_TIMEOUT_MS = 3000;
+
+ public static final String DEFAULT_KAFKA_SINK_SEND_COMPRESSION_TYPE = "none";
+
+ public static final String DEFAULT_KAFKA_SINK_SEND_KEY_SERIALIZER =
+ "org.apache.kafka.common.serialization.StringSerializer";
+
+ public static final String DEFAULT_KAFKA_SINK_SEND_VALUE_SERIALIZER =
+ "org.apache.kafka.common.serialization.ByteArraySerializer";
+
}
diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
index 57cae7730..341ef2391 100644
--- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
+++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/JobProfileDto.java
@@ -39,6 +39,7 @@ public class JobProfileDto {
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATAPROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink";
public static final String PULSAR_SINK = "org.apache.inlong.agent.plugin.sinks.PulsarSink";
+ public static final String KAFKA_SINK = "org.apache.inlong.agent.plugin.sinks.KafkaSink";
/**
* file source
@@ -412,6 +413,8 @@ public class JobProfileDto {
job.setTopicInfo(GSON.toJson(dataConfig.getTopicInfo()));
if (mqType.equals(MQType.PULSAR)) {
job.setSink(PULSAR_SINK);
+ } else if (mqType.equals(MQType.KAFKA)) {
+ job.setSink(KAFKA_SINK);
} else {
throw new IllegalArgumentException("input dataConfig" + dataConfig + "is invalid please check");
}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
new file mode 100644
index 000000000..f9070e5fc
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/KafkaSink.java
@@ -0,0 +1,442 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.agent.common.AgentThreadFactory;
+import org.apache.inlong.agent.conf.AgentConfiguration;
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.core.task.TaskPositionManager;
+import org.apache.inlong.agent.message.BatchProxyMessage;
+import org.apache.inlong.agent.message.EndMessage;
+import org.apache.inlong.agent.message.PackProxyMessage;
+import org.apache.inlong.agent.message.ProxyMessage;
+import org.apache.inlong.agent.metrics.audit.AuditUtils;
+import org.apache.inlong.agent.plugin.Message;
+import org.apache.inlong.agent.utils.AgentUtils;
+import org.apache.inlong.agent.utils.ThreadUtils;
+import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_ENABLE_ASYNC_SEND;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_KAFKA_SINK_SEND_ACKS;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_KAFKA_SINK_SEND_COMPRESSION_TYPE;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_KAFKA_SINK_SEND_KEY_SERIALIZER;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_KAFKA_SINK_SEND_VALUE_SERIALIZER;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_KAFKA_SINK_SYNC_SEND_TIMEOUT_MS;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_PRODUCER_NUM;
+import static org.apache.inlong.agent.constant.AgentConstants.DEFAULT_SEND_QUEUE_SIZE;
+import static org.apache.inlong.agent.constant.AgentConstants.KAFKA_PRODUCER_ENABLE_ASYNC_SEND;
+import static org.apache.inlong.agent.constant.AgentConstants.KAFKA_SINK_PRODUCER_NUM;
+import static org.apache.inlong.agent.constant.AgentConstants.KAFKA_SINK_SEND_QUEUE_SIZE;
+import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_JAAS_CONFIG;
+import static org.apache.kafka.common.config.SaslConfigs.SASL_MECHANISM;
+
+public class KafkaSink extends AbstractSink {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSink.class);
+ private static final ExecutorService EXECUTOR_SERVICE = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), new AgentThreadFactory("KafkaSink"));
+ private final AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
+ private TaskPositionManager taskPositionManager;
+ private volatile boolean shutdown = false;
+
+ private List<MQClusterInfo> mqClusterInfos;
+ private String topic;
+ private List<KafkaSender> kafkaSenders;
+ private static final AtomicInteger KAFKA_SENDER_INDEX = new AtomicInteger(0);
+
+ private LinkedBlockingQueue<BatchProxyMessage> kafkaSendQueue;
+
+ private int producerNum;
+ private boolean asyncSend;
+
+ @Override
+ public void init(JobProfile jobConf) {
+ super.init(jobConf);
+ taskPositionManager = TaskPositionManager.getInstance();
+ int sendQueueSize = agentConf.getInt(KAFKA_SINK_SEND_QUEUE_SIZE, DEFAULT_SEND_QUEUE_SIZE);
+ kafkaSendQueue = new LinkedBlockingQueue<>(sendQueueSize);
+ producerNum = agentConf.getInt(KAFKA_SINK_PRODUCER_NUM, DEFAULT_PRODUCER_NUM);
+ asyncSend = agentConf.getBoolean(KAFKA_PRODUCER_ENABLE_ASYNC_SEND, DEFAULT_ENABLE_ASYNC_SEND);
+
+ mqClusterInfos = jobConf.getMqClusters();
+ Preconditions.checkArgument(ObjectUtils.isNotEmpty(jobConf.getMqTopic()) && jobConf.getMqTopic().isValid(),
+ "no valid kafka topic config");
+ topic = jobConf.getMqTopic().getTopic();
+
+ kafkaSenders = new ArrayList<>();
+ initKafkaSender();
+ EXECUTOR_SERVICE.execute(sendDataThread());
+ EXECUTOR_SERVICE.execute(flushCache());
+ }
+
+ @Override
+ public void write(Message message) {
+ if (message == null || message instanceof EndMessage) {
+ return;
+ }
+
+ try {
+ ProxyMessage proxyMessage = new ProxyMessage(message);
+ // add proxy message to cache.
+ cache.compute(proxyMessage.getBatchKey(),
+ (s, packProxyMessage) -> {
+ if (packProxyMessage == null) {
+ packProxyMessage =
+ new PackProxyMessage(jobInstanceId, jobConf, inlongGroupId, inlongStreamId);
+ packProxyMessage.generateExtraMap(proxyMessage.getDataKey());
+ packProxyMessage.addTopicAndDataTime(topic, System.currentTimeMillis());
+ }
+ // add message to package proxy
+ packProxyMessage.addProxyMessage(proxyMessage);
+ return packProxyMessage;
+ });
+ // increment the count of successful sinks
+ sinkMetric.sinkSuccessCount.incrementAndGet();
+ } catch (Exception e) {
+ sinkMetric.sinkFailCount.incrementAndGet();
+ LOGGER.error("write job[{}] data to cache error", jobInstanceId, e);
+ } catch (Throwable t) {
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
+ }
+ }
+
+ @Override
+ public void destroy() {
+ LOGGER.info("destroy job[{}] kafka sink", jobInstanceId);
+ while (!sinkFinish()) {
+ LOGGER.info("job[{}] wait until cache all data to kafka", jobInstanceId);
+ AgentUtils.silenceSleepInMs(batchFlushInterval);
+ }
+ shutdown = true;
+ if (CollectionUtils.isNotEmpty(kafkaSenders)) {
+ for (KafkaSender sender : kafkaSenders) {
+ sender.close();
+ }
+ kafkaSenders.clear();
+ }
+ }
+
+ private boolean sinkFinish() {
+ return cache.values().stream().allMatch(PackProxyMessage::isEmpty) && kafkaSendQueue.isEmpty();
+ }
+
+ /**
+ * flush cache by batch
+ *
+ * @return thread runner
+ */
+ private Runnable flushCache() {
+ return () -> {
+ LOGGER.info("start kafka sink flush cache thread, job[{}], groupId[{}]", jobInstanceId, inlongGroupId);
+ while (!shutdown) {
+ try {
+ cache.forEach((batchKey, packProxyMessage) -> {
+ BatchProxyMessage batchProxyMessage = packProxyMessage.fetchBatch();
+ if (batchProxyMessage == null) {
+ return;
+ }
+ try {
+ kafkaSendQueue.put(batchProxyMessage);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(
+ "send group id {}, message key {},with message size {}, the job id is {}, "
+ + "read source is {} sendTime is {}",
+ inlongGroupId, batchKey,
+ batchProxyMessage.getDataList().size(), jobInstanceId, sourceName,
+ batchProxyMessage.getDataTime());
+ }
+ } catch (Exception e) {
+ LOGGER.error("flush job[{}] data to send queue exception", jobInstanceId, e);
+ }
+ });
+ AgentUtils.silenceSleepInMs(batchFlushInterval);
+ } catch (Exception ex) {
+ LOGGER.error("error caught", ex);
+ } catch (Throwable t) {
+ ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
+ }
+ }
+ };
+ }
+
+ /**
+ * take batchMsg from sendQueue and send to kafka
+ */
+ private Runnable sendDataThread() {
+ return () -> {
+ LOGGER.info("start kafka sink send data thread, job[{}], groupId[{}]", jobInstanceId, inlongGroupId);
+ while (!shutdown) {
+ try {
+ BatchProxyMessage data = kafkaSendQueue.poll(1, TimeUnit.MILLISECONDS);
+ if (ObjectUtils.isEmpty(data)) {
+ continue;
+ }
+ sendData(data);
+ } catch (Throwable t) {
+ LOGGER.error("send job[{}] data to kafka error", jobInstanceId, t);
+ }
+ }
+ };
+ }
+
+ private void sendData(BatchProxyMessage batchMsg) throws InterruptedException {
+ if (ObjectUtils.isEmpty(batchMsg)) {
+ return;
+ }
+
+ KafkaProducer<String, byte[]> producer = selectProducer();
+ if (ObjectUtils.isEmpty(producer)) {
+ kafkaSendQueue.put(batchMsg);
+ LOGGER.error("send job[{}] data err, empty kafka producer", jobInstanceId);
+ return;
+ }
+
+ ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, batchMsg.getInLongMsg().buildArray());
+ sinkMetric.pluginSendCount.addAndGet(batchMsg.getMsgCnt());
+ if (asyncSend) {
+ producer.send(record, new AsyncSinkCallback(System.currentTimeMillis(), batchMsg));
+ } else {
+ try {
+ Future<RecordMetadata> future = producer.send(record);
+ future.get(DEFAULT_KAFKA_SINK_SYNC_SEND_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ updateSuccessSendMetrics(batchMsg);
+ } catch (Exception e) {
+ sinkMetric.pluginSendFailCount.addAndGet(batchMsg.getMsgCnt());
+ LOGGER.error("send job[{}] data fail to kafka, add back to send queue, send queue size {}",
+ jobInstanceId,
+ kafkaSendQueue.size(), e);
+ kafkaSendQueue.put(batchMsg);
+ }
+ }
+ }
+
+ private void updateSuccessSendMetrics(BatchProxyMessage batchMsg) {
+ AuditUtils.add(AuditUtils.AUDIT_ID_AGENT_SEND_SUCCESS, batchMsg.getGroupId(),
+ batchMsg.getStreamId(), batchMsg.getDataTime(), batchMsg.getMsgCnt(),
+ batchMsg.getTotalSize());
+ sinkMetric.pluginSendSuccessCount.addAndGet(batchMsg.getMsgCnt());
+ if (sourceName != null) {
+ taskPositionManager.updateSinkPosition(batchMsg, sourceName, batchMsg.getMsgCnt());
+ }
+ }
+
+ private KafkaProducer<String, byte[]> selectProducer() {
+ if (CollectionUtils.isEmpty(kafkaSenders)) {
+ LOGGER.error("send job[{}] data err, empty kafka sender", jobInstanceId);
+ return null;
+ }
+ KafkaSender sender = kafkaSenders.get(
+ (KAFKA_SENDER_INDEX.getAndIncrement() & Integer.MAX_VALUE) % kafkaSenders.size());
+ return sender.getProducer();
+ }
+
+ private void initKafkaSender() {
+ if (CollectionUtils.isEmpty(mqClusterInfos)) {
+ LOGGER.error("init job[{}] kafka producer fail, empty mqCluster info", jobInstanceId);
+ return;
+ }
+ for (MQClusterInfo clusterInfo : mqClusterInfos) {
+ if (!clusterInfo.isValid()) {
+ continue;
+ }
+ kafkaSenders.add(new KafkaSender(clusterInfo, producerNum));
+ }
+ }
+
+ class KafkaSender {
+
+ private final Properties kafkaProps = new Properties();
+ private List<KafkaProducer<String, byte[]>> producers;
+ private final AtomicInteger producerIndex = new AtomicInteger(0);
+
+ public KafkaSender(MQClusterInfo clusterInfo, int producerNum) {
+ setKafkaProps(clusterInfo);
+ initKafkaProducer(kafkaProps, producerNum);
+ }
+
+ private void setKafkaProps(MQClusterInfo clusterInfo) {
+ kafkaProps.clear();
+ Map<String, String> params = clusterInfo.getParams();
+
+ // set bootstrap server
+ String bootStrapServers = params.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG);
+ if (bootStrapServers == null) {
+ throw new IllegalArgumentException("kafka param bootstrap.servers is null");
+ }
+ kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
+
+ // set ack
+ String acks = params.get(ProducerConfig.ACKS_CONFIG);
+ if (StringUtils.isNotEmpty(acks)) {
+ kafkaProps.put(ProducerConfig.ACKS_CONFIG, acks);
+ } else {
+ kafkaProps.put(ProducerConfig.ACKS_CONFIG, DEFAULT_KAFKA_SINK_SEND_ACKS);
+ }
+
+ // set compression
+ String compressionType = params.get(ProducerConfig.COMPRESSION_TYPE_CONFIG);
+ if (StringUtils.isNotEmpty(compressionType)) {
+ kafkaProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
+ } else {
+ kafkaProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, DEFAULT_KAFKA_SINK_SEND_COMPRESSION_TYPE);
+ }
+
+ // set serializer
+ String keySerializer = params.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
+ if (StringUtils.isNotEmpty(keySerializer)) {
+ kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
+ } else {
+ kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, DEFAULT_KAFKA_SINK_SEND_KEY_SERIALIZER);
+ }
+
+ String valueSerializer = params.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
+ if (StringUtils.isNotEmpty(keySerializer)) {
+ kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
+ } else {
+ kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, DEFAULT_KAFKA_SINK_SEND_VALUE_SERIALIZER);
+ }
+
+ // set linger
+ String lingerMs = params.get(ProducerConfig.LINGER_MS_CONFIG);
+ if (StringUtils.isNotEmpty(lingerMs)) {
+ kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, Integer.parseInt(lingerMs));
+ }
+
+ // set batch size
+ String batchSize = params.get(ProducerConfig.BATCH_SIZE_CONFIG);
+ if (StringUtils.isNotEmpty(batchSize)) {
+ kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, Integer.parseInt(batchSize));
+ }
+
+ // set buffer memory
+ String bufferMemory = params.get(ProducerConfig.BUFFER_MEMORY_CONFIG);
+ if (StringUtils.isNotEmpty(batchSize)) {
+ kafkaProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, Integer.parseInt(bufferMemory));
+ }
+
+ // set authentication
+ String securityProtocol = params.get(SECURITY_PROTOCOL_CONFIG);
+ if (StringUtils.isNotEmpty(securityProtocol)) {
+ kafkaProps.put(SECURITY_PROTOCOL_CONFIG, securityProtocol);
+ }
+
+ String saslMechanism = params.get(SASL_MECHANISM);
+ if (StringUtils.isNotEmpty(saslMechanism)) {
+ kafkaProps.put(SASL_MECHANISM, saslMechanism);
+ }
+
+ String saslJaasConfig = params.get(SASL_JAAS_CONFIG);
+ if (StringUtils.isNotEmpty(saslJaasConfig)) {
+ kafkaProps.put(SASL_JAAS_CONFIG, saslJaasConfig);
+ }
+ }
+
+ private void initKafkaProducer(Properties kafkaProps, int producerNum) {
+ producers = new ArrayList<>(producerNum);
+ for (int i = 0; i < producerNum; i++) {
+ producers.add(new KafkaProducer<>(kafkaProps));
+ }
+ }
+
+ public KafkaProducer<String, byte[]> getProducer() {
+ if (CollectionUtils.isEmpty(producers)) {
+ LOGGER.error("job[{}] empty producers", jobInstanceId);
+ return null;
+ }
+ int index = (producerIndex.getAndIncrement() & Integer.MAX_VALUE) % producers.size();
+ return producers.get(index);
+ }
+
+ /**
+ * close all kafka producer
+ */
+ public void close() {
+ if (CollectionUtils.isEmpty(producers)) {
+ return;
+ }
+
+ for (KafkaProducer<String, byte[]> producer : producers) {
+ producer.close();
+ }
+ }
+ }
+
+ class AsyncSinkCallback implements Callback {
+
+ private long startTime;
+ private BatchProxyMessage batchMsg;
+
+ public AsyncSinkCallback(long startTime, BatchProxyMessage batchMsg) {
+ this.startTime = startTime;
+ this.batchMsg = batchMsg;
+ }
+
+ @Override
+ public void onCompletion(RecordMetadata metadata, Exception exception) {
+ if (exception != null) {
+ sinkMetric.pluginSendFailCount.addAndGet(batchMsg.getMsgCnt());
+ LOGGER.error("send job[{}] data fail to kafka, will add back to sendqueue, current sendqueue size {}",
+ jobInstanceId,
+ kafkaSendQueue.size(), exception);
+ try {
+ kafkaSendQueue.put(batchMsg);
+ } catch (InterruptedException ex) {
+ LOGGER.error("put job[{}] data back to queue fail, send queue size {}", jobInstanceId,
+ kafkaSendQueue.size(), ex);
+ }
+ } else {
+ updateSuccessSendMetrics(batchMsg);
+ }
+
+ if (LOGGER.isDebugEnabled()) {
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ if (metadata != null) {
+ LOGGER.debug("acked job[{}] message partition:{} ofset:{}", jobInstanceId, metadata.partition(),
+ metadata.offset());
+ }
+ LOGGER.debug("job[{}] send data to kafka elapsed time: {}", jobInstanceId, elapsedTime);
+ }
+ }
+ }
+}
diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
index 5050e39dd..090ab2c33 100755
--- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
+++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sinks/ProxySink.java
@@ -82,12 +82,10 @@ public class ProxySink extends AbstractSink {
});
// increment the count of successful sinks
sinkMetric.sinkSuccessCount.incrementAndGet();
- } else {
- // increment the count of failed sinks
- sinkMetric.sinkFailCount.incrementAndGet();
}
}
} catch (Exception e) {
+ sinkMetric.sinkFailCount.incrementAndGet();
LOGGER.error("write message to Proxy sink error", e);
} catch (Throwable t) {
ThreadUtils.threadThrowableHandler(Thread.currentThread(), t);
diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
new file mode 100644
index 000000000..83030dc81
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sinks/KafkaSinkTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.plugin.sinks;
+
+import org.apache.inlong.agent.conf.JobProfile;
+import org.apache.inlong.agent.message.ProxyMessage;
+import org.apache.inlong.agent.plugin.AgentBaseTestsHelper;
+import org.apache.inlong.agent.plugin.MiniAgent;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_GROUP_ID;
+import static org.apache.inlong.agent.constant.CommonConstants.PROXY_KEY_STREAM_ID;
+import static org.junit.Assert.assertEquals;
+
+public class KafkaSinkTest {
+
+ private static MockSink kafkaSink;
+ private static JobProfile jobProfile;
+ private static AgentBaseTestsHelper helper;
+ private static MiniAgent agent;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ helper = new AgentBaseTestsHelper(KafkaSinkTest.class.getName()).setupAgentHome();
+ agent = new MiniAgent();
+ jobProfile = JobProfile.parseJsonFile("kafkaSinkJob.json");
+ jobProfile.set("job.mqClusters",
+ "[{\"url\":\"mqurl\",\"token\":\"token\",\"mqType\":\"KAFKA\",\"params\":{}}]");
+ jobProfile.set("job.topicInfo", "{\"topic\":\"topic\",\"inlongGroupId\":\"groupId\"}");
+ System.out.println(jobProfile.toJsonStr());
+ kafkaSink = new MockSink();
+ kafkaSink.init(jobProfile);
+ }
+
+ @Test
+ public void testWrite() {
+ String body = "testMesage";
+ Map<String, String> attr = new HashMap<>();
+ attr.put(PROXY_KEY_GROUP_ID, "groupId");
+ attr.put(PROXY_KEY_STREAM_ID, "streamId");
+ long count = 5;
+ for (long i = 0; i < 5; i++) {
+ kafkaSink.write(new ProxyMessage(body.getBytes(StandardCharsets.UTF_8), attr));
+ }
+ assertEquals(kafkaSink.sinkMetric.sinkSuccessCount.get(), count);
+ }
+
+}
diff --git a/inlong-agent/agent-plugins/src/test/resources/kafkaSinkJob.json b/inlong-agent/agent-plugins/src/test/resources/kafkaSinkJob.json
new file mode 100644
index 000000000..4f5110439
--- /dev/null
+++ b/inlong-agent/agent-plugins/src/test/resources/kafkaSinkJob.json
@@ -0,0 +1,15 @@
+{
+ "job": {
+ "id": 1,
+ "instance.id": "job_1",
+ "source": "org.apache.inlong.agent.plugin.sources.TextFileSource",
+ "sink": "org.apache.inlong.agent.plugin.sinks.KafkaSink",
+ "channel": "org.apache.inlong.agent.plugin.channel.MemoryChannel",
+ "groupId": "groupId",
+ "streamId": "streamId"
+ },
+ "proxy": {
+ "inlongGroupId": "groupId",
+ "inlongStreamId": "streamId"
+ }
+}
\ No newline at end of file
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
index 4b94d8391..114c7db62 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/MQClusterInfo.java
@@ -17,6 +17,7 @@
package org.apache.inlong.common.pojo.dataproxy;
+import lombok.ToString;
import org.apache.commons.lang3.StringUtils;
import java.util.HashMap;
@@ -25,6 +26,7 @@ import java.util.Map;
/**
* MQ cluster info.
*/
+@ToString
public class MQClusterInfo {
private String url;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index 187b92bff..f83a559fc 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -40,6 +40,8 @@ public class InlongConstants {
*/
public static final String COMMA = ",";
+ public static final String DOT = ".";
+
public static final String BLANK = " ";
public static final String SLASH = "/";
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
index d644bb8d8..b3a2d7e92 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AgentServiceImpl.java
@@ -84,6 +84,8 @@ import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.inlong.manager.common.consts.InlongConstants.DOT;
+
/**
* Agent service layer implementation
*/
@@ -476,9 +478,8 @@ public class AgentServiceImpl implements AgentService {
// add mq cluster setting
List<MQClusterInfo> mqSet = new ArrayList<>();
List<String> clusterTagList = Collections.singletonList(groupEntity.getInlongClusterTag());
- List<String> typeList = Arrays.asList(ClusterType.TUBEMQ, ClusterType.PULSAR);
ClusterPageRequest pageRequest = ClusterPageRequest.builder()
- .typeList(typeList)
+ .type(groupEntity.getMqType())
.clusterTagList(clusterTagList)
.build();
List<InlongClusterEntity> mqClusterList = clusterMapper.selectByCondition(pageRequest);
@@ -518,6 +519,11 @@ public class AgentServiceImpl implements AgentService {
topicConfig.setInlongGroupId(groupId);
topicConfig.setTopic(mqResource);
dataConfig.setTopicInfo(topicConfig);
+ } else if (MQType.KAFKA.equals(mqType)) {
+ DataProxyTopicInfo topicConfig = new DataProxyTopicInfo();
+ topicConfig.setInlongGroupId(groupId);
+ topicConfig.setTopic(groupEntity.getMqResource() + DOT + streamEntity.getMqResource());
+ dataConfig.setTopicInfo(topicConfig);
}
} else {
LOGGER.warn("set syncSend=[0] as the stream not exists for groupId={}, streamId={}", groupId, streamId);