You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/10/12 08:11:03 UTC

[inlong] branch master updated: [INLONG-5495][SDK] Support multi-topic manager (#5802)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 08dfa0ecc [INLONG-5495][SDK] Support multi-topic manager  (#5802)
08dfa0ecc is described below

commit 08dfa0ecc48944f6fc45197827c177e9eb07d528
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Wed Oct 12 16:10:58 2022 +0800

    [INLONG-5495][SDK] Support multi-topic manager  (#5802)
---
 .../sdk/sort/api/InlongTopicManagerFactory.java    |   3 +-
 .../sort/fetcher/kafka/AckOffsetOnRebalance.java   |  10 +-
 .../fetcher/kafka/KafkaSingleTopicFetcher.java     |   2 +-
 .../sort/impl/kafka/InLongKafkaFetcherImpl.java    |   2 +-
 .../sdk/sort/manager/InlongMultiTopicManager.java  | 289 +++++++++++++++++++++
 5 files changed, 300 insertions(+), 6 deletions(-)

diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
index a8219f95f..834315236 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/api/InlongTopicManagerFactory.java
@@ -19,6 +19,7 @@
 package org.apache.inlong.sdk.sort.api;
 
 import org.apache.inlong.sdk.sort.api.SortClientConfig.TopicType;
+import org.apache.inlong.sdk.sort.manager.InlongMultiTopicManager;
 import org.apache.inlong.sdk.sort.manager.InlongSingleTopicManager;
 
 /**
@@ -47,6 +48,6 @@ public class InlongTopicManagerFactory {
     public static TopicManager createMultiTopicManager(
             ClientContext context,
             QueryConsumeConfig queryConsumeConfig) {
-        return null;
+        return new InlongMultiTopicManager(context, queryConsumeConfig);
     }
 }
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
index 0cf27bc82..d0a66c6e3 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/AckOffsetOnRebalance.java
@@ -51,9 +51,13 @@ public class AckOffsetOnRebalance implements ConsumerRebalanceListener {
     private final AtomicLong revokedNum = new AtomicLong(0);
     private final AtomicLong assignedNum = new AtomicLong(0);
 
-    public AckOffsetOnRebalance(String clusterId, Seeker seeker,
-                                ConcurrentHashMap<TopicPartition, OffsetAndMetadata> commitOffsetMap) {
-        this(clusterId, seeker, commitOffsetMap, null, null);
+    public AckOffsetOnRebalance(
+            String clusterId,
+            Seeker seeker,
+            ConcurrentHashMap<TopicPartition,
+            OffsetAndMetadata> commitOffsetMap,
+            KafkaConsumer<byte[], byte[]> consumer) {
+        this(clusterId, seeker, commitOffsetMap, null, consumer);
     }
 
     public AckOffsetOnRebalance(
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
index 9ed97261a..3f903720b 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/fetcher/kafka/KafkaSingleTopicFetcher.java
@@ -79,7 +79,7 @@ public class KafkaSingleTopicFetcher extends SingleTopicFetcher {
                 this.seeker = SeekerFactory.createKafkaSeeker(consumer, topic);
                 consumer.subscribe(Collections.singletonList(topic.getTopic()),
                         new AckOffsetOnRebalance(this.topic.getInLongCluster().getClusterId(), seeker,
-                                commitOffsetMap));
+                                commitOffsetMap, consumer));
             } else {
                 LOGGER.info("consumer is null");
                 return false;
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
index d2da76c0f..2dd9c2cf1 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/kafka/InLongKafkaFetcherImpl.java
@@ -77,7 +77,7 @@ public class InLongKafkaFetcherImpl extends InLongTopicFetcher {
                 this.seeker = SeekerFactory.createKafkaSeeker(consumer, inLongTopic);
                 consumer.subscribe(Collections.singletonList(inLongTopic.getTopic()),
                         new AckOffsetOnRebalance(this.inLongTopic.getInLongCluster().getClusterId(), seeker,
-                                commitOffsetMap));
+                                commitOffsetMap, consumer));
             } else {
                 logger.info("consumer is null");
                 return false;
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
new file mode 100644
index 000000000..f19f9261c
--- /dev/null
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.manager;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
+import org.apache.inlong.sdk.sort.api.TopicFetcherBuilder;
+import org.apache.inlong.sdk.sort.api.TopicManager;
+import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
+import org.apache.inlong.sdk.sort.util.PeriodicTask;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Inlong manager that maintain the {@link org.apache.inlong.sdk.sort.api.MultiTopicsFetcher}.
+ * It is suitable to the cases that topics share the same configurations.
+ * And each consumer will consume multi topic.
+ */
+public class InlongMultiTopicManager extends TopicManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InlongMultiTopicManager.class);
+
+    private final Map<String, List<TopicFetcher>> pulsarFetchers = new ConcurrentHashMap<>();
+    private final Map<String, List<TopicFetcher>> kafkaFetchers = new ConcurrentHashMap<>();
+    private final Map<String, List<TopicFetcher>> tubeFetchers = new ConcurrentHashMap<>();
+    private final Map<String, TopicFetcher> allFetchers = new ConcurrentHashMap<>();
+    private Set<String> allTopics = new HashSet<>();
+    private final PeriodicTask updateMetaDataWorker;
+
+    private boolean stopAssign = false;
+    private int consumerSize;
+
+    public InlongMultiTopicManager(ClientContext context, QueryConsumeConfig queryConsumeConfig) {
+        super(context, queryConsumeConfig);
+        this.consumerSize = context.getConfig().getMaxConsumerSize();
+        updateMetaDataWorker = new UpdateMetaDataThread(context.getConfig().getUpdateMetaDataIntervalSec(),
+                TimeUnit.SECONDS);
+        String threadName = "sortsdk_multi_topic_manager_" + context.getConfig().getSortTaskId()
+                + "_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
+        updateMetaDataWorker.start(threadName);
+        LOGGER.info("create InlongMultiTopicManager success");
+    }
+
+    @Override
+    public boolean clean() {
+        LOGGER.info("start clean {}", context.getConfig().getSortTaskId());
+        close();
+        offlineAllTopicsAndPartitions();
+        LOGGER.info("end clean {}", context.getConfig().getSortTaskId());
+        return true;
+    }
+
+    @Override
+    public TopicFetcher addTopic(InLongTopic topic) {
+        return null;
+    }
+
+    @Override
+    public TopicFetcher removeTopic(InLongTopic topic, boolean closeFetcher) {
+        return null;
+    }
+
+    @Override
+    public TopicFetcher getFetcher(String fetchKey) {
+        return allFetchers.get(fetchKey);
+    }
+
+    @Override
+    public Collection<TopicFetcher> getAllFetchers() {
+        return allFetchers.values();
+    }
+
+    @Override
+    public Set<String> getManagedInLongTopics() {
+        return allTopics;
+    }
+
+    @Override
+    public void offlineAllTopicsAndPartitions() {
+        String subscribeId = context.getConfig().getSortTaskId();
+        try {
+            LOGGER.info("start offline {}", subscribeId);
+            stopAssign = true;
+            Set<Map.Entry<String, TopicFetcher>> entries = allFetchers.entrySet();
+            for (Map.Entry<String, TopicFetcher> entry : entries) {
+                String fetchKey = entry.getKey();
+                TopicFetcher topicFetcher = entry.getValue();
+                boolean succ = false;
+                if (topicFetcher != null) {
+                    try {
+                        succ = topicFetcher.close();
+                    } catch (Exception e) {
+                        LOGGER.error("got exception when close fetcher={}", topicFetcher.getTopics(), e);
+                    }
+                }
+                LOGGER.info("close fetcher={} {}", fetchKey, succ);
+            }
+        } catch (Exception e) {
+            LOGGER.error("got exception when offline topics and partitions, ", e);
+        } finally {
+            allFetchers.clear();
+            kafkaFetchers.clear();
+            pulsarFetchers.clear();
+            tubeFetchers.clear();
+            stopAssign = false;
+            LOGGER.info("close finished {}", subscribeId);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (updateMetaDataWorker != null) {
+            updateMetaDataWorker.stop();
+        }
+    }
+
+    private void handleUpdatedConsumeConfig(List<InLongTopic> assignedTopics) {
+        if (CollectionUtils.isEmpty(assignedTopics)) {
+            LOGGER.warn("assignedTopics is null or empty, do nothing");
+            return;
+        }
+        this.allTopics = assignedTopics.stream()
+                .map(InLongTopic::getTopic)
+                .collect(Collectors.toSet());
+
+        assignedTopics.stream()
+                .filter(topic -> InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(topic.getTopicType()))
+                .collect(Collectors.groupingBy(topic -> topic.getInLongCluster().getClusterId()))
+                .forEach(this::updateKafkaFetcher);
+
+        assignedTopics.stream()
+                .filter(topic -> InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(topic.getTopicType()))
+                .collect(Collectors.groupingBy(topic -> topic.getInLongCluster().getClusterId()))
+                .forEach(this::updatePulsarFetcher);
+
+        assignedTopics.stream()
+                .filter(topic -> InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(topic.getTopicType()))
+                .collect(Collectors.groupingBy(topic -> topic.getInLongCluster().getClusterId()))
+                .forEach(this::updateTubeFetcher);
+    }
+
+    private void updateKafkaFetcher(String clusterId, List<InLongTopic> topics) {
+        List<TopicFetcher> fetchers = kafkaFetchers.computeIfAbsent(clusterId, k -> new ArrayList<>());
+        if (CollectionUtils.isNotEmpty(fetchers)) {
+            fetchers.forEach(fetcher -> fetcher.updateTopics(topics));
+            return;
+        }
+        String bootstraps = topics.stream().findFirst().get().getInLongCluster().getBootstraps();
+        TopicFetcherBuilder builder = TopicFetcherBuilder.newKafkaBuilder()
+                .bootstrapServers(bootstraps)
+                .topic(topics)
+                .context(context);
+        LOGGER.info("create new kafka multi topic consumer for bootstrap {}, size is {}", bootstraps, consumerSize);
+        for (int i = 0; i < consumerSize; i++) {
+            fetchers.add(builder.subscribe());
+        }
+        fetchers.forEach(topicFetcher -> allFetchers.put(topicFetcher.getFetchKey(), topicFetcher));
+    }
+
+    private void updatePulsarFetcher(String clusterId, List<InLongTopic> topics) {
+        List<TopicFetcher> fetchers = pulsarFetchers.computeIfAbsent(clusterId, k -> new ArrayList<>());
+        if (CollectionUtils.isNotEmpty(fetchers)) {
+            fetchers.forEach(fetcher -> fetcher.updateTopics(topics));
+            return;
+        }
+        InLongTopic topic = topics.stream().findFirst().get();
+        LOGGER.info("create new pulsar multi topic consumer for bootstrap {}, size is {}",
+                topic.getInLongCluster().getBootstraps(), consumerSize);
+        for (int i = 0; i < consumerSize; i++) {
+            try {
+                PulsarClient pulsarClient = PulsarClient.builder()
+                        .serviceUrl(topic.getInLongCluster().getBootstraps())
+                        .authentication(AuthenticationFactory.token(topic.getInLongCluster().getToken()))
+                        .build();
+                TopicFetcher fetcher = TopicFetcherBuilder.newPulsarBuilder()
+                        .pulsarClient(pulsarClient)
+                        .topic(topics)
+                        .context(context)
+                        .subscribe();
+                fetchers.add(fetcher);
+                allFetchers.put(fetcher.getFetchKey(), fetcher);
+            } catch (PulsarClientException e) {
+                LOGGER.error("failed to create pulsar client for {}\n", topic.getInLongCluster().getBootstraps(), e);
+            }
+        }
+    }
+
+    private void updateTubeFetcher(String clusterId, List<InLongTopic> topics) {
+        List<TopicFetcher> fetchers = tubeFetchers.computeIfAbsent(clusterId, k -> new ArrayList<>());
+        if (CollectionUtils.isNotEmpty(fetchers)) {
+            fetchers.forEach(fetcher -> fetcher.updateTopics(topics));
+            return;
+        }
+        InLongTopic topic = topics.stream().findFirst().get();
+        LOGGER.info("create new tube multi topic consumer for bootstrap {}, size is {}",
+                topic.getInLongCluster().getBootstraps(), consumerSize);
+        for (int i = 0; i < consumerSize; i++) {
+            try {
+                TubeClientConfig tubeConfig = new TubeClientConfig(topic.getInLongCluster().getBootstraps());
+                MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(tubeConfig);
+                TubeConsumerCreator tubeConsumerCreator = new TubeConsumerCreator(messageSessionFactory,
+                        tubeConfig);
+                topics.forEach(tubeTopic -> {
+                    TopicFetcher fetcher = TopicFetcherBuilder.newTubeBuilder()
+                            .tubeConsumerCreater(tubeConsumerCreator)
+                            .topic(tubeTopic)
+                            .context(context)
+                            .subscribe();
+                    fetchers.add(fetcher);
+                });
+            } catch (TubeClientException e) {
+                LOGGER.error("failed to create tube client for {}\n", topic.getInLongCluster().getBootstraps(), e);
+            }
+        }
+        fetchers.forEach(topicFetcher -> allFetchers.put(topicFetcher.getFetchKey(), topicFetcher));
+    }
+
+    private class UpdateMetaDataThread extends PeriodicTask {
+
+        public UpdateMetaDataThread(long runInterval, TimeUnit timeUnit) {
+            super(runInterval, timeUnit, context.getConfig());
+        }
+
+        @Override
+        protected void doWork() {
+            logger.debug("InLongTopicManagerImpl doWork");
+            if (stopAssign) {
+                logger.warn("assign is stopped");
+                return;
+            }
+            // get sortTask conf from manager
+            if (queryConsumeConfig != null) {
+                long start = System.currentTimeMillis();
+                context.getDefaultStateCounter().addRequestManagerTimes(1);
+                ConsumeConfig consumeConfig = queryConsumeConfig
+                        .queryCurrentConsumeConfig(context.getConfig().getSortTaskId());
+                context.getDefaultStateCounter().addRequestManagerTimeCost(System.currentTimeMillis() - start);
+
+                if (consumeConfig != null) {
+                    handleUpdatedConsumeConfig(consumeConfig.getTopics());
+                } else {
+                    logger.warn("subscribedInfo is null");
+                    context.getDefaultStateCounter().addRequestManagerFailTimes(1);
+                }
+            } else {
+                logger.error("subscribedMetaDataInfo is null");
+            }
+        }
+    }
+}