You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/25 04:42:48 UTC
[inlong] branch master updated: [INLONG-5623][SDK] Support multi-topic fetcher for Kafka (#5659)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new f8d7f8bba [INLONG-5623][SDK] Support multi-topic fetcher for Kafka (#5659)
f8d7f8bba is described below
commit f8d7f8bba6a6737db899c0cc6e4070affdb24d1a
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Thu Aug 25 12:42:41 2022 +0800
[INLONG-5623][SDK] Support multi-topic fetcher for Kafka (#5659)
---
.../inlong/sdk/sort/api/MultiTopicsFetcher.java | 17 +
.../sort/fetcher/kafka/AckOffsetOnRebalance.java | 36 ++
.../fetcher/kafka/KafkaMultiTopicsFetcher.java | 388 +++++++++++++++++++++
.../fetcher/pulsar/PulsarMultiTopicsFetcher.java | 16 -
4 files changed, 441 insertions(+), 16 deletions(-)
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java
index 7f7ab6ec4..557ebaa1b 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/MultiTopicsFetcher.java
@@ -19,6 +19,7 @@ package org.apache.inlong.sdk.sort.api;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -65,4 +66,20 @@ public abstract class MultiTopicsFetcher implements TopicFetcher {
this.executor = Executors.newSingleThreadScheduledExecutor();
}
+ protected boolean needUpdate(Collection<InLongTopic> newTopics) {
+ if (newTopics.size() != onlineTopics.size()) {
+ return true;
+ }
+ // all topic should share the same properties in one task
+ if (Objects.equals(newTopics.stream().findFirst(), onlineTopics.values().stream().findFirst())) {
+ return true;
+ }
+ for (InLongTopic topic : newTopics) {
+ if (!onlineTopics.containsKey(topic.getTopic())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
}
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
index 1aec582fb..4c8d456c6 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
@@ -26,7 +26,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collection;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
@@ -34,12 +36,22 @@ public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
private final String clusterId;
private final Seeker seeker;
private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
+ private final ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap;
public AckOffsetOnRebalance(String clusterId, Seeker seeker,
ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) {
+ this(clusterId, seeker, commitOffsetMap, null);
+ }
+
+ public AckOffsetOnRebalance(
+ String clusterId,
+ Seeker seeker,
+ ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap,
+ ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap) {
this.clusterId = clusterId;
this.seeker = seeker;
this.commitOffsetMap = commitOffsetMap;
+ this.ackOffsetMap = ackOffsetMap;
}
@Override
@@ -48,6 +60,30 @@ public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
collection.forEach((v) -> {
LOGGER.info("clusterId:{},onPartitionsRevoked:{}", clusterId, v.toString());
});
+ if (Objects.nonNull(ackOffsetMap) && Objects.nonNull(commitOffsetMap)) {
+ ackRevokedPartitions(collection);
+ }
+ }
+
+ private void ackRevokedPartitions(Collection<TopicPartition> collection) {
+ collection.forEach(tp -> {
+ if (!ackOffsetMap.containsKey(tp)) {
+ return;
+ }
+ ConcurrentSkipListMap<Long, Boolean> tpOffsetMap = ackOffsetMap.remove(tp);
+ long commitOffset = -1;
+ for (Long ackOffset : tpOffsetMap.keySet()) {
+ if (!tpOffsetMap.get(ackOffset)) {
+ break;
+ }
+ commitOffset = ackOffset;
+ }
+ // the first haven't ack, do nothing
+ if (commitOffset == -1) {
+ return;
+ }
+ commitOffsetMap.put(tp, new OffsetAndMetadata(commitOffset));
+ });
}
@Override
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
new file mode 100644
index 000000000..2a4bbdd15
--- /dev/null
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaMultiTopicsFetcher.java
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.fetcher.kafka;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.Deserializer;
+import org.apache.inlong.sdk.sort.api.Interceptor;
+import org.apache.inlong.sdk.sort.api.MultiTopicsFetcher;
+import org.apache.inlong.sdk.sort.api.SeekerFactory;
+import org.apache.inlong.sdk.sort.api.SortClientConfig;
+import org.apache.inlong.sdk.sort.entity.InLongMessage;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.entity.MessageRecord;
+import org.apache.inlong.sdk.sort.fetcher.pulsar.PulsarMultiTopicsFetcher;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.consumer.RangeAssignor;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Kafka multi topics fetcher
+ */
+public class KafkaMultiTopicsFetcher extends MultiTopicsFetcher {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PulsarMultiTopicsFetcher.class);
+ private final ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap;
+ private final ConcurrentHashMap<TopicPartition, ConcurrentSkipListMap<Long, Boolean>> ackOffsetMap;
+ private final String bootstrapServers;
+ private ConsumerRebalanceListener listener;
+ private KafkaConsumer<byte[], byte[]> consumer;
+
+ public KafkaMultiTopicsFetcher(
+ List<InLongTopic> topics,
+ ClientContext context,
+ Interceptor interceptor,
+ Deserializer deserializer,
+ String bootstrapServers) {
+ super(topics, context, interceptor, deserializer);
+ this.bootstrapServers = bootstrapServers;
+ this.commitOffsetMap = new ConcurrentHashMap<>();
+ this.ackOffsetMap = new ConcurrentHashMap<>();
+ }
+
+ @Override
+ public boolean init() {
+ try {
+ this.consumer = createKafkaConsumer();
+ InLongTopic topic = onlineTopics.values().stream().findFirst().get();
+ this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic);
+ this.listener = new AckOffsetOnRebalance(topic.getInLongCluster().getClusterId(), seeker,
+ commitOffsetMap);
+ consumer.subscribe(onlineTopics.keySet(), listener);
+ return true;
+ } catch (Throwable t) {
+ LOGGER.error("failed to init kafka consumer: ", t);
+ return false;
+ }
+ }
+
+ private KafkaConsumer<byte[], byte[]> createKafkaConsumer() {
+ Properties properties = new Properties();
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, context.getConfig().getSortTaskId());
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ ByteArrayDeserializer.class.getName());
+ properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG,
+ context.getConfig().getKafkaSocketRecvBufferSize());
+ properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ SortClientConfig.ConsumeStrategy offsetResetStrategy = context.getConfig().getOffsetResetStrategy();
+ if (offsetResetStrategy == SortClientConfig.ConsumeStrategy.lastest
+ || offsetResetStrategy == SortClientConfig.ConsumeStrategy.lastest_absolutely) {
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
+ } else if (offsetResetStrategy == SortClientConfig.ConsumeStrategy.earliest
+ || offsetResetStrategy == SortClientConfig.ConsumeStrategy.earliest_absolutely) {
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ } else {
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
+ }
+ properties.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,
+ context.getConfig().getKafkaFetchSizeBytes());
+ properties.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,
+ context.getConfig().getKafkaFetchWaitMs());
+ properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
+ RangeAssignor.class.getName());
+ properties.put(ConsumerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 120000L);
+ LOGGER.info("start to create kafka consumer:{}", properties);
+ return new KafkaConsumer<>(properties);
+ }
+
+ @Override
+ public void ack(String msgOffset) throws Exception {
+ // the format of multi topic kafka fetcher msg offset is topic:partitionId:offset, such as topic1:20:1746839
+ String[] offset = msgOffset.split(":");
+ if (offset.length != 3) {
+ throw new Exception("offset is illegal, the correct format is topic:partitionId:offset, "
+ + "the error offset is:" + msgOffset);
+ }
+
+ // parse topic partition offset
+ TopicPartition topicPartition = new TopicPartition(offset[0], Integer.parseInt(offset[1]));
+ long ackOffset = Long.parseLong(offset[2]);
+
+ // ack
+ if (!ackOffsetMap.containsKey(topicPartition) || !ackOffsetMap.get(topicPartition).containsKey(ackOffset)) {
+ LOGGER.warn("did not find offsetMap or ack offset of {}, offset {}, just ignore it",
+ topicPartition, ackOffset);
+ return;
+ }
+
+ // mark this offset has been ack.
+ ConcurrentSkipListMap<Long, Boolean> tpOffsetMap = ackOffsetMap.get(topicPartition);
+ // to prevent race condition in AckOffsetOnRebalance::onPartitionsRevoked
+ if (Objects.nonNull(tpOffsetMap)) {
+ tpOffsetMap.put(ackOffset, true);
+ }
+ }
+
+ @Override
+ public void pause() {
+ consumer.pause(consumer.assignment());
+ }
+
+ @Override
+ public void resume() {
+ consumer.resume(consumer.assignment());
+ }
+
+ @Override
+ public boolean close() {
+ this.closed = true;
+ try {
+ if (fetchThread != null) {
+ fetchThread.interrupt();
+ }
+ if (consumer != null) {
+ prepareCommit();
+ consumer.commitSync(commitOffsetMap);
+ consumer.close();
+ }
+ commitOffsetMap.clear();
+ } catch (Throwable t) {
+ LOGGER.warn("got exception in multi topic fetcher close: ", t);
+ }
+ LOGGER.info("closed kafka multi topic fetcher");
+ return true;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return closed;
+ }
+
+ @Override
+ public void setStopConsume(boolean stopConsume) {
+ this.stopConsume = stopConsume;
+ }
+
+ @Override
+ public boolean isStopConsume() {
+ return stopConsume;
+ }
+
+ @Override
+ public List<InLongTopic> getTopics() {
+ return new ArrayList<>(onlineTopics.values());
+ }
+
+ @Override
+ public boolean updateTopics(List<InLongTopic> topics) {
+ if (needUpdate(topics)) {
+ return updateAll(topics);
+ }
+ LOGGER.info("no need to update topics");
+ return false;
+ }
+
+ private boolean updateAll(Collection<InLongTopic> newTopics) {
+ if (CollectionUtils.isEmpty(newTopics)) {
+ LOGGER.error("new topics is empty or null");
+ return false;
+ }
+
+ // stop
+ this.setStopConsume(true);
+
+ // update
+ this.onlineTopics = newTopics.stream().collect(Collectors.toMap(InLongTopic::getTopic, t -> t));
+ InLongTopic topic = onlineTopics.values().stream().findFirst().get();
+ this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic);
+ this.listener = new AckOffsetOnRebalance(topic.getInLongCluster().getClusterId(), seeker,
+ commitOffsetMap, ackOffsetMap);
+ Optional.ofNullable(interceptor).ifPresent(i -> i.configure(topic));
+
+ // subscribe new
+ consumer.subscribe(onlineTopics.keySet(), listener);
+
+ // resume
+ this.setStopConsume(false);
+ return true;
+ }
+
+ private void prepareCommit() {
+ ackOffsetMap.forEach((topicPartition, tpOffsetMap) -> {
+ synchronized (tpOffsetMap) {
+ // get the remove list
+ List<Long> removeOffsets = new ArrayList<>();
+ long commitOffset = -1;
+ for (Long ackOffset : tpOffsetMap.keySet()) {
+ if (!tpOffsetMap.get(ackOffset)) {
+ break;
+ }
+ removeOffsets.add(ackOffset);
+ commitOffset = ackOffset;
+ }
+ // the first haven't ack, do nothing
+ if (commitOffset == -1) {
+ return;
+ }
+
+ // remove offset and commit offset
+ removeOffsets.forEach(tpOffsetMap::remove);
+ commitOffsetMap.put(topicPartition, new OffsetAndMetadata(commitOffset));
+ }
+ });
+ }
+
+ public class Fetcher implements Runnable {
+
+ private void commitKafkaOffset() {
+ prepareCommit();
+ if (consumer != null) {
+ try {
+ consumer.commitAsync(commitOffsetMap, null);
+ commitOffsetMap.clear();
+ } catch (Exception e) {
+ LOGGER.error("commit kafka offset failed: ", e);
+ }
+ }
+ }
+
+ /**
+ * put the received msg to onFinished method
+ *
+ * @param messageRecords {@link List < MessageRecord >}
+ */
+ private void handleAndCallbackMsg(List<MessageRecord> messageRecords) {
+ long start = System.currentTimeMillis();
+ try {
+ context.getDefaultStateCounter().addCallbackTimes(1);
+ context.getConfig().getCallback().onFinishedBatch(messageRecords);
+ context.getDefaultStateCounter()
+ .addCallbackTimeCost(System.currentTimeMillis() - start)
+ .addCallbackDoneTimes(1);
+ } catch (Exception e) {
+ context.getDefaultStateCounter().addCallbackErrorTimes(1);
+ LOGGER.error("failed to callback: ", e);
+ }
+ }
+
+ private String getOffset(String topic, int partitionId, long offset) {
+ TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+ ackOffsetMap.computeIfAbsent(topicPartition, k -> new ConcurrentSkipListMap<>()).put(offset, false);
+ return topic + ":" + partitionId + ":" + offset;
+ }
+
+ private Map<String, String> getMsgHeaders(Headers headers) {
+ Map<String, String> headerMap = new HashMap<>();
+ for (Header header : headers) {
+ headerMap.put(header.key(), new String(header.value()));
+ }
+ return headerMap;
+ }
+
+ @Override
+ public void run() {
+ boolean hasPermit;
+ while (true) {
+ hasPermit = false;
+ try {
+ if (context.getConfig().isStopConsume() || stopConsume) {
+ TimeUnit.MILLISECONDS.sleep(50);
+ continue;
+ }
+
+ if (sleepTime > 0) {
+ TimeUnit.MILLISECONDS.sleep(sleepTime);
+ }
+
+ context.acquireRequestPermit();
+ hasPermit = true;
+ // fetch from kafka
+ fetchFromKafka();
+ // commit
+ commitKafkaOffset();
+ } catch (Exception e) {
+ context.getDefaultStateCounter().addFetchErrorTimes(1);
+ LOGGER.error("failed in kafka multi topic fetcher: ", e);
+ } finally {
+ if (hasPermit) {
+ context.releaseRequestPermit();
+ }
+ }
+ }
+ }
+
+ private void fetchFromKafka() throws Exception {
+ context.getDefaultStateCounter().addMsgCount(1).addFetchTimes(1);
+
+ long startFetchTime = System.currentTimeMillis();
+ ConsumerRecords<byte[], byte[]> records = consumer
+ .poll(Duration.ofMillis(context.getConfig().getKafkaFetchWaitMs()));
+ context.getDefaultStateCounter().addFetchTimeCost(System.currentTimeMillis() - startFetchTime);
+ if (null != records && !records.isEmpty()) {
+
+ for (ConsumerRecord<byte[], byte[]> msg : records) {
+ List<MessageRecord> msgs = new ArrayList<>();
+ String topicName = msg.topic();
+ InLongTopic topic = onlineTopics.get(topicName);
+ String offsetKey = getOffset(topicName, msg.partition(), msg.offset());
+ List<InLongMessage> inLongMessages = deserializer
+ .deserialize(context, topic, getMsgHeaders(msg.headers()), msg.value());
+ inLongMessages = interceptor.intercept(inLongMessages);
+ if (inLongMessages.isEmpty()) {
+ ack(offsetKey);
+ continue;
+ }
+
+ msgs.add(new MessageRecord(topic.getTopicKey(),
+ inLongMessages,
+ offsetKey, System.currentTimeMillis()));
+ context.getStateCounterByTopic(topic).addConsumeSize(msg.value().length);
+ context.getStateCounterByTopic(topic).addMsgCount(msgs.size());
+ handleAndCallbackMsg(msgs);
+ }
+ sleepTime = 0L;
+ } else {
+ context.getDefaultStateCounter().addEmptyFetchTimes(1);
+ emptyFetchTimes++;
+ if (emptyFetchTimes >= context.getConfig().getEmptyPollTimes()) {
+ sleepTime = Math.min((sleepTime += context.getConfig().getEmptyPollSleepStepMs()),
+ context.getConfig().getMaxEmptyPollSleepMs());
+ emptyFetchTimes = 0;
+ }
+ }
+ }
+ }
+}
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
index 3244e9d5b..938869bf7 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/pulsar/PulsarMultiTopicsFetcher.java
@@ -304,22 +304,6 @@ public class PulsarMultiTopicsFetcher extends MultiTopicsFetcher {
return false;
}
- private boolean needUpdate(Collection<InLongTopic> newTopics) {
- if (newTopics.size() != onlineTopics.size()) {
- return true;
- }
- // all topic should share the same properties in one task
- if (Objects.equals(newTopics.stream().findFirst(), onlineTopics.values().stream().findFirst())) {
- return true;
- }
- for (InLongTopic topic : newTopics) {
- if (!onlineTopics.containsKey(topic.getTopic())) {
- return true;
- }
- }
- return false;
- }
-
public class Fetcher implements Runnable {
/**