You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/12 08:11:03 UTC
[inlong] branch master updated: [INLONG-5495][SDK] Support multi-topic manager (#5802)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 08dfa0ecc [INLONG-5495][SDK] Support multi-topic manager (#5802)
08dfa0ecc is described below
commit 08dfa0ecc48944f6fc45197827c177e9eb07d528
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Wed Oct 12 16:10:58 2022 +0800
[INLONG-5495][SDK] Support multi-topic manager (#5802)
---
.../sdk/sort/api/InlongTopicManagerFactory.java | 3 +-
.../sort/fetcher/kafka/AckOffsetOnRebalance.java | 10 +-
.../fetcher/kafka/KafkaSingleTopicFetcher.java | 2 +-
.../sort/impl/kafka/InLongKafkaFetcherImpl.java | 2 +-
.../sdk/sort/manager/InlongMultiTopicManager.java | 289 +++++++++++++++++++++
5 files changed, 300 insertions(+), 6 deletions(-)
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
index a8219f95f..834315236 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
@@ -19,6 +19,7 @@
package org.apache.inlong.sdk.sort.api;
import org.apache.inlong.sdk.sort.api.SortClientConfig.TopicType;
+import org.apache.inlong.sdk.sort.manager.InlongMultiTopicManager;
import org.apache.inlong.sdk.sort.manager.InlongSingleTopicManager;
/**
@@ -47,6 +48,6 @@ public class InlongTopicManagerFactory {
public static TopicManager createMultiTopicManager(
ClientContext context,
QueryConsumeConfig queryConsumeConfig) {
- return null;
+ return new InlongMultiTopicManager(context, queryConsumeConfig);
}
}
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
index 0cf27bc82..d0a66c6e3 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
@@ -51,9 +51,13 @@ public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
private final AtomicLong revokedNum = new AtomicLong(0);
private final AtomicLong assignedNum = new AtomicLong(0);
- public AckOffsetOnRebalance(String clusterId, Seeker seeker,
- ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) {
- this(clusterId, seeker, commitOffsetMap, null, null);
+ public AckOffsetOnRebalance(
+ String clusterId,
+ Seeker seeker,
+ ConcurrentHashMap<TopicPartition,
+ OffsetAndMetadata> commitOffsetMap,
+ KafkaConsumer<byte[], byte[]> consumer) {
+ this(clusterId, seeker, commitOffsetMap, null, consumer);
}
public AckOffsetOnRebalance(
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
index 9ed97261a..3f903720b 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
@@ -79,7 +79,7 @@ public class KafkaSingleTopicFetcher extends SingleTopicFetcher {
this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic);
consumer.subscribe(Collections.singletonList(topic.getTopic()),
new AckOffsetOnRebalance(this.topic.getInLongCluster().getClusterId(), seeker,
- commitOffsetMap));
+ commitOffsetMap, consumer));
} else {
LOGGER.info("consumer is null");
return false;
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
index d2da76c0f..2dd9c2cf1 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
@@ -77,7 +77,7 @@ public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
this.seeker = SeekerFactory.createKafkaSeeker(consumer, inLongTopic);
consumer.subscribe(Collections.singletonList(inLongTopic.getTopic()),
new AckOffsetOnRebalance(this.inLongTopic.getInLongCluster().getClusterId(), seeker,
- commitOffsetMap));
+ commitOffsetMap, consumer));
} else {
logger.info("consumer is null");
return false;
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
new file mode 100644
index 000000000..f19f9261c
--- /dev/null
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.manager;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
+import org.apache.inlong.sdk.sort.api.TopicFetcherBuilder;
+import org.apache.inlong.sdk.sort.api.TopicManager;
+import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
+import org.apache.inlong.sdk.sort.util.PeriodicTask;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Inlong manager that maintain the {@link org.apache.inlong.sdk.sort.api.MultiTopicsFetcher}.
+ * It is suitable to the cases that topics share the same configurations.
+ * And each consumer will consume multi topic.
+ */
+public class InlongMultiTopicManager extends TopicManager {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InlongMultiTopicManager.class);
+
+ private final Map<String, List<TopicFetcher>> pulsarFetchers = new ConcurrentHashMap<>();
+ private final Map<String, List<TopicFetcher>> kafkaFetchers = new ConcurrentHashMap<>();
+ private final Map<String, List<TopicFetcher>> tubeFetchers = new ConcurrentHashMap<>();
+ private final Map<String, TopicFetcher> allFetchers = new ConcurrentHashMap<>();
+ private Set<String> allTopics = new HashSet<>();
+ private final PeriodicTask updateMetaDataWorker;
+
+ private boolean stopAssign = false;
+ private int consumerSize;
+
+ public InlongMultiTopicManager(ClientContext context, QueryConsumeConfig queryConsumeConfig) {
+ super(context, queryConsumeConfig);
+ this.consumerSize = context.getConfig().getMaxConsumerSize();
+ updateMetaDataWorker = new UpdateMetaDataThread(context.getConfig().getUpdateMetaDataIntervalSec(),
+ TimeUnit.SECONDS);
+ String threadName = "sortsdk_multi_topic_manager_" + context.getConfig().getSortTaskId()
+ + "_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
+ updateMetaDataWorker.start(threadName);
+ LOGGER.info("create InlongMultiTopicManager success");
+ }
+
+ @Override
+ public boolean clean() {
+ LOGGER.info("start clean {}", context.getConfig().getSortTaskId());
+ close();
+ offlineAllTopicsAndPartitions();
+ LOGGER.info("end clean {}", context.getConfig().getSortTaskId());
+ return true;
+ }
+
+ @Override
+ public TopicFetcher addTopic(InLongTopic topic) {
+ return null;
+ }
+
+ @Override
+ public TopicFetcher removeTopic(InLongTopic topic, boolean closeFetcher) {
+ return null;
+ }
+
+ @Override
+ public TopicFetcher getFetcher(String fetchKey) {
+ return allFetchers.get(fetchKey);
+ }
+
+ @Override
+ public Collection<TopicFetcher> getAllFetchers() {
+ return allFetchers.values();
+ }
+
+ @Override
+ public Set<String> getManagedInLongTopics() {
+ return allTopics;
+ }
+
+ @Override
+ public void offlineAllTopicsAndPartitions() {
+ String subscribeId = context.getConfig().getSortTaskId();
+ try {
+ LOGGER.info("start offline {}", subscribeId);
+ stopAssign = true;
+ Set<Map.Entry<String, TopicFetcher>> entries = allFetchers.entrySet();
+ for (Map.Entry<String, TopicFetcher> entry : entries) {
+ String fetchKey = entry.getKey();
+ TopicFetcher topicFetcher = entry.getValue();
+ boolean succ = false;
+ if (topicFetcher != null) {
+ try {
+ succ = topicFetcher.close();
+ } catch (Exception e) {
+ LOGGER.error("got exception when close fetcher={}", topicFetcher.getTopics(), e);
+ }
+ }
+ LOGGER.info("close fetcher={} {}", fetchKey, succ);
+ }
+ } catch (Exception e) {
+ LOGGER.error("got exception when offline topics and partitions, ", e);
+ } finally {
+ allFetchers.clear();
+ kafkaFetchers.clear();
+ pulsarFetchers.clear();
+ tubeFetchers.clear();
+ stopAssign = false;
+ LOGGER.info("close finished {}", subscribeId);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (updateMetaDataWorker != null) {
+ updateMetaDataWorker.stop();
+ }
+ }
+
+ private void handleUpdatedConsumeConfig(List<InLongTopic> assignedTopics) {
+ if (CollectionUtils.isEmpty(assignedTopics)) {
+ LOGGER.warn("assignedTopics is null or empty, do nothing");
+ return;
+ }
+ this.allTopics = assignedTopics.stream()
+ .map(InLongTopic::getTopic)
+ .collect(Collectors.toSet());
+
+ assignedTopics.stream()
+ .filter(topic -> InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(topic.getTopicType()))
+ .collect(Collectors.groupingBy(topic -> topic.getInLongCluster().getClusterId()))
+ .forEach(this::updateKafkaFetcher);
+
+ assignedTopics.stream()
+ .filter(topic -> InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(topic.getTopicType()))
+ .collect(Collectors.groupingBy(topic -> topic.getInLongCluster().getClusterId()))
+ .forEach(this::updatePulsarFetcher);
+
+ assignedTopics.stream()
+ .filter(topic -> InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(topic.getTopicType()))
+ .collect(Collectors.groupingBy(topic -> topic.getInLongCluster().getClusterId()))
+ .forEach(this::updateTubeFetcher);
+ }
+
+ private void updateKafkaFetcher(String clusterId, List<InLongTopic> topics) {
+ List<TopicFetcher> fetchers = kafkaFetchers.computeIfAbsent(clusterId, k -> new ArrayList<>());
+ if (CollectionUtils.isNotEmpty(fetchers)) {
+ fetchers.forEach(fetcher -> fetcher.updateTopics(topics));
+ return;
+ }
+ String bootstraps = topics.stream().findFirst().get().getInLongCluster().getBootstraps();
+ TopicFetcherBuilder builder = TopicFetcherBuilder.newKafkaBuilder()
+ .bootstrapServers(bootstraps)
+ .topic(topics)
+ .context(context);
+ LOGGER.info("create new kafka multi topic consumer for bootstrap {}, size is {}", bootstraps, consumerSize);
+ for (int i = 0; i < consumerSize; i++) {
+ fetchers.add(builder.subscribe());
+ }
+ fetchers.forEach(topicFetcher -> allFetchers.put(topicFetcher.getFetchKey(), topicFetcher));
+ }
+
+ private void updatePulsarFetcher(String clusterId, List<InLongTopic> topics) {
+ List<TopicFetcher> fetchers = pulsarFetchers.computeIfAbsent(clusterId, k -> new ArrayList<>());
+ if (CollectionUtils.isNotEmpty(fetchers)) {
+ fetchers.forEach(fetcher -> fetcher.updateTopics(topics));
+ return;
+ }
+ InLongTopic topic = topics.stream().findFirst().get();
+ LOGGER.info("create new pulsar multi topic consumer for bootstrap {}, size is {}",
+ topic.getInLongCluster().getBootstraps(), consumerSize);
+ for (int i = 0; i < consumerSize; i++) {
+ try {
+ PulsarClient pulsarClient = PulsarClient.builder()
+ .serviceUrl(topic.getInLongCluster().getBootstraps())
+ .authentication(AuthenticationFactory.token(topic.getInLongCluster().getToken()))
+ .build();
+ TopicFetcher fetcher = TopicFetcherBuilder.newPulsarBuilder()
+ .pulsarClient(pulsarClient)
+ .topic(topics)
+ .context(context)
+ .subscribe();
+ fetchers.add(fetcher);
+ allFetchers.put(fetcher.getFetchKey(), fetcher);
+ } catch (PulsarClientException e) {
+ LOGGER.error("failed to create pulsar client for {}\n", topic.getInLongCluster().getBootstraps(), e);
+ }
+ }
+ }
+
+ private void updateTubeFetcher(String clusterId, List<InLongTopic> topics) {
+ List<TopicFetcher> fetchers = tubeFetchers.computeIfAbsent(clusterId, k -> new ArrayList<>());
+ if (CollectionUtils.isNotEmpty(fetchers)) {
+ fetchers.forEach(fetcher -> fetcher.updateTopics(topics));
+ return;
+ }
+ InLongTopic topic = topics.stream().findFirst().get();
+ LOGGER.info("create new tube multi topic consumer for bootstrap {}, size is {}",
+ topic.getInLongCluster().getBootstraps(), consumerSize);
+ for (int i = 0; i < consumerSize; i++) {
+ try {
+ TubeClientConfig tubeConfig = new TubeClientConfig(topic.getInLongCluster().getBootstraps());
+ MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(tubeConfig);
+ TubeConsumerCreator tubeConsumerCreator = new TubeConsumerCreator(messageSessionFactory,
+ tubeConfig);
+ topics.forEach(tubeTopic -> {
+ TopicFetcher fetcher = TopicFetcherBuilder.newTubeBuilder()
+ .tubeConsumerCreater(tubeConsumerCreator)
+ .topic(tubeTopic)
+ .context(context)
+ .subscribe();
+ fetchers.add(fetcher);
+ });
+ } catch (TubeClientException e) {
+ LOGGER.error("failed to create tube client for {}\n", topic.getInLongCluster().getBootstraps(), e);
+ }
+ }
+ fetchers.forEach(topicFetcher -> allFetchers.put(topicFetcher.getFetchKey(), topicFetcher));
+ }
+
+ private class UpdateMetaDataThread extends PeriodicTask {
+
+ public UpdateMetaDataThread(long runInterval, TimeUnit timeUnit) {
+ super(runInterval, timeUnit, context.getConfig());
+ }
+
+ @Override
+ protected void doWork() {
+ logger.debug("InLongTopicManagerImpl doWork");
+ if (stopAssign) {
+ logger.warn("assign is stopped");
+ return;
+ }
+ // get sortTask conf from manager
+ if (queryConsumeConfig != null) {
+ long start = System.currentTimeMillis();
+ context.getDefaultStateCounter().addRequestManagerTimes(1);
+ ConsumeConfig consumeConfig = queryConsumeConfig
+ .queryCurrentConsumeConfig(context.getConfig().getSortTaskId());
+ context.getDefaultStateCounter().addRequestManagerTimeCost(System.currentTimeMillis() - start);
+
+ if (consumeConfig != null) {
+ handleUpdatedConsumeConfig(consumeConfig.getTopics());
+ } else {
+ logger.warn("subscribedInfo is null");
+ context.getDefaultStateCounter().addRequestManagerFailTimes(1);
+ }
+ } else {
+ logger.error("subscribedMetaDataInfo is null");
+ }
+ }
+ }
+}