You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/09/16 02:48:23 UTC

[GitHub] [inlong] vernedeng commented on a diff in pull request #5802: [INLONG-5495][SDK] Support multi-topic manager

vernedeng commented on code in PR #5802:
URL: https://github.com/apache/inlong/pull/5802#discussion_r972565660


##########
inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/manager/InlongMultiTopicManager.java:
##########
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.sort.manager;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.inlong.sdk.sort.api.ClientContext;
+import org.apache.inlong.sdk.sort.api.InlongTopicTypeEnum;
+import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
+import org.apache.inlong.sdk.sort.api.TopicFetcher;
+import org.apache.inlong.sdk.sort.api.TopicFetcherBuilder;
+import org.apache.inlong.sdk.sort.api.TopicManager;
+import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
+import org.apache.inlong.sdk.sort.entity.InLongTopic;
+import org.apache.inlong.sdk.sort.fetcher.tube.TubeConsumerCreator;
+import org.apache.inlong.sdk.sort.util.PeriodicTask;
+import org.apache.inlong.sdk.sort.util.StringUtil;
+import org.apache.inlong.tubemq.client.config.TubeClientConfig;
+import org.apache.inlong.tubemq.client.exception.TubeClientException;
+import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
+import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * Inlong manager that maintain the {@link org.apache.inlong.sdk.sort.api.MultiTopicsFetcher}.
+ * It is suitable to the cases that topics share the same configurations.
+ * And each consumer will consume multi topic.
+ */
+public class InlongMultiTopicManager extends TopicManager {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InlongMultiTopicManager.class);
+
+    private final Map<String, List<TopicFetcher>> pulsarFetchers = new ConcurrentHashMap<>();
+    private final Map<String, List<TopicFetcher>> kafkaFetchers = new ConcurrentHashMap<>();
+    private final Map<String, List<TopicFetcher>> tubeFetchers = new ConcurrentHashMap<>();
+    private final Map<String, TopicFetcher> allFetchers = new ConcurrentHashMap<>();
+    private Set<String> allTopics = new HashSet<>();
+    private final PeriodicTask updateMetaDataWorker;
+
+    private boolean stopAssign = false;
+    private int consumerSize;
+
+    public InlongMultiTopicManager(ClientContext context, QueryConsumeConfig queryConsumeConfig) {
+        super(context, queryConsumeConfig);
+        this.consumerSize = context.getConfig().getMaxConsumerSize();
+        updateMetaDataWorker = new UpdateMetaDataThread(context.getConfig().getUpdateMetaDataIntervalSec(),
+                TimeUnit.SECONDS);
+        String threadName = "sortsdk_multi_topic_manager_" + context.getConfig().getSortTaskId()
+                + "_" + StringUtil.formatDate(new Date(), "yyyy-MM-dd HH:mm:ss");
+        updateMetaDataWorker.start(threadName);
+        LOGGER.info("create InlongMultiTopicManager success");
+    }
+
+    @Override
+    public boolean clean() {
+        LOGGER.info("start clean {}", context.getConfig().getSortTaskId());
+        close();
+        offlineAllTopicsAndPartitions();
+        LOGGER.info("end clean {}", context.getConfig().getSortTaskId());
+        return true;
+    }
+
+    @Override
+    public TopicFetcher addTopic(InLongTopic topic) {
+        return null;
+    }
+
+    @Override
+    public TopicFetcher removeTopic(InLongTopic topic, boolean closeFetcher) {
+        return null;
+    }
+
+    @Override
+    public TopicFetcher getFetcher(String fetchKey) {
+        return allFetchers.get(fetchKey);
+    }
+
+    @Override
+    public Collection<TopicFetcher> getAllFetchers() {
+        return allFetchers.values();
+    }
+
+    @Override
+    public Set<String> getManagedInLongTopics() {
+        return allTopics;
+    }
+
+    @Override
+    public void offlineAllTopicsAndPartitions() {
+        String subscribeId = context.getConfig().getSortTaskId();
+        try {
+            LOGGER.info("start offline {}", subscribeId);
+            stopAssign = true;
+            Set<Map.Entry<String, TopicFetcher>> entries = allFetchers.entrySet();
+            for (Map.Entry<String, TopicFetcher> entry : entries) {
+                String fetchKey = entry.getKey();
+                TopicFetcher topicFetcher = entry.getValue();
+                boolean succ = false;
+                if (topicFetcher != null) {
+                    try {
+                        succ = topicFetcher.close();
+                    } catch (Exception e) {
+                        LOGGER.error(e.getMessage(), e);
+                    }
+                }
+                LOGGER.info("close fetcher{} {}", fetchKey, succ);
+            }
+        } catch (Exception e) {
+            LOGGER.error(e.getMessage(), e);
+        } finally {
+            allFetchers.clear();
+            kafkaFetchers.clear();
+            pulsarFetchers.clear();
+            tubeFetchers.clear();
+            stopAssign = false;
+            LOGGER.info("close finished {}", subscribeId);
+        }
+    }
+
+    @Override
+    public void close() {
+        if (updateMetaDataWorker != null) {
+            updateMetaDataWorker.stop();
+        }
+    }
+
+    private void handleUpdatedConsumeConfig(List<InLongTopic> assignedTopics) {
+        if (CollectionUtils.isEmpty(assignedTopics)) {
+            LOGGER.warn("assignedTopics is null or empty, do nothing");
+            return;
+        }
+        this.allTopics = assignedTopics.stream()
+                        .map(InLongTopic::getTopic)
+                        .collect(Collectors.toSet());
+
+        assignedTopics.stream()
+                .filter(topic -> InlongTopicTypeEnum.KAFKA.getName().equalsIgnoreCase(topic.getTopicType()))
+                .collect(Collectors.groupingBy(topic -> topic.getInLongCluster().getClusterId()))
+                .forEach(this::updateKafkaFetcher);
+
+        assignedTopics.stream()
+                .filter(topic -> InlongTopicTypeEnum.PULSAR.getName().equalsIgnoreCase(topic.getTopicType()))
+                .collect(Collectors.groupingBy(topic -> topic.getInLongCluster().getClusterId()))
+                .forEach(this::updatePulsarFetcher);
+
+        assignedTopics.stream()
+                .filter(topic -> InlongTopicTypeEnum.TUBE.getName().equalsIgnoreCase(topic.getTopicType()))
+                .collect(Collectors.groupingBy(topic -> topic.getInLongCluster().getClusterId()))
+                .forEach(this::updateTubeFetcher);
+    }
+
+    private void updateKafkaFetcher(String clusterId, List<InLongTopic> topics) {
+        List<TopicFetcher> fetchers = kafkaFetchers.computeIfAbsent(clusterId, k -> new ArrayList<>());
+        if (CollectionUtils.isNotEmpty(fetchers)) {
+            fetchers.forEach(fetcher -> fetcher.updateTopics(topics));
+            return;
+        }
+        String bootstraps = topics.stream().findFirst().get().getInLongCluster().getBootstraps();
+        TopicFetcherBuilder builder =  TopicFetcherBuilder.newKafkaBuilder()
+                .bootstrapServers(bootstraps)
+                .topic(topics)
+                .context(context);
+        LOGGER.info("create new kafka multi topic consumer for bootstrap {}, size is {}", bootstraps, consumerSize);
+        for (int i = 0; i < consumerSize; i++) {
+            fetchers.add(builder.subscribe());
+        }
+        fetchers.forEach(topicFetcher -> allFetchers.put(topicFetcher.getFetchKey(), topicFetcher));
+    }
+
+    private void updatePulsarFetcher(String clusterId, List<InLongTopic> topics) {
+        List<TopicFetcher> fetchers = pulsarFetchers.computeIfAbsent(clusterId, k -> new ArrayList<>());
+        if (CollectionUtils.isNotEmpty(fetchers)) {
+            fetchers.forEach(fetcher -> fetcher.updateTopics(topics));
+            return;
+        }
+        InLongTopic topic = topics.stream().findFirst().get();
+        LOGGER.info("create new pulsar multi topic consumer for bootstrap {}, size is {}",
+                topic.getInLongCluster().getBootstraps(), consumerSize);
+        for (int i = 0; i < consumerSize; i++) {
+            try {
+                PulsarClient pulsarClient = PulsarClient.builder()
+                        .serviceUrl(topic.getInLongCluster().getBootstraps())
+                        .authentication(AuthenticationFactory.token(topic.getInLongCluster().getToken()))
+                        .build();
+                TopicFetcher fetcher = TopicFetcherBuilder.newPulsarBuilder()
+                        .pulsarClient(pulsarClient)
+                        .topic(topics)
+                        .context(context)
+                        .subscribe();
+                fetchers.add(fetcher);
+            } catch (PulsarClientException e) {
+                LOGGER.error("failed to create pulsar client for {}\n", topic.getInLongCluster().getBootstraps(), e);
+            }
+        }
+        fetchers.forEach(topicFetcher -> allFetchers.put(topicFetcher.getFetchKey(), topicFetcher));
+    }
+
+    private void updateTubeFetcher(String  clusterId, List<InLongTopic> topics) {
+        List<TopicFetcher> fetchers = tubeFetchers.computeIfAbsent(clusterId, k -> new ArrayList<>());
+        if (CollectionUtils.isNotEmpty(fetchers)) {
+            fetchers.forEach(fetcher -> fetcher.updateTopics(topics));
+            return;
+        }
+        InLongTopic topic = topics.stream().findFirst().get();
+        LOGGER.info("create new tube multi topic consumer for bootstrap {}, size is {}",
+                topic.getInLongCluster().getBootstraps(), consumerSize);
+        for (int i = 0; i < consumerSize; i++) {
+            try {
+                TubeClientConfig tubeConfig = new TubeClientConfig(topic.getInLongCluster().getBootstraps());
+                MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(tubeConfig);
+                TubeConsumerCreator tubeConsumerCreator = new TubeConsumerCreator(messageSessionFactory,
+                        tubeConfig);
+                topics.forEach(tubeTopic -> {
+                    TopicFetcher fetcher = TopicFetcherBuilder.newTubeBuilder()
+                            .tubeConsumerCreater(tubeConsumerCreator)
+                            .topic(tubeTopic)
+                            .context(context)
+                            .subscribe();
+                    fetchers.add(fetcher);
+                });
+            } catch (TubeClientException e) {
+                LOGGER.error("failed to create tube client for {}\n", topic.getInLongCluster().getBootstraps(), e);
+            }
+        }
+        fetchers.forEach(topicFetcher -> allFetchers.put(topicFetcher.getFetchKey(), topicFetcher));
+    }
+
+    private class UpdateMetaDataThread extends PeriodicTask {
+
+        public UpdateMetaDataThread(long runInterval, TimeUnit timeUnit) {
+            super(runInterval, timeUnit, context.getConfig());
+        }
+
+        @Override
+        protected void doWork() {
+            logger.debug("InLongTopicManagerImpl doWork");

Review Comment:
   The logger is inherited from base class ***PeriodicTask***, and it is not a static variables



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org