You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by GitBox <gi...@apache.org> on 2022/09/16 06:46:12 UTC

[GitHub] [inlong] luchunliang commented on a diff in pull request #5843: [INLONG-5842][Manager] Support maintenance of message queue cluster

luchunliang commented on code in PR #5843:
URL: https://github.com/apache/inlong/pull/5843#discussion_r972667887


##########
inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/queue/MessageQueueServiceImpl.java:
##########
@@ -0,0 +1,555 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster.queue;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
+import org.apache.inlong.common.msg.AttributeConstants;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueClearTopicRequest;
+import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueControlRequest;
+import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueOfflineRequest;
+import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueOnlineRequest;
+import org.apache.inlong.manager.pojo.cluster.queue.MessageQueueSynchronizeTopicRequest;
+import org.apache.inlong.manager.pojo.cluster.tubemq.TubeClusterInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupPageRequest;
+import org.apache.inlong.manager.service.resource.queue.tubemq.TubeMQOperator;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.PulsarAdminBuilder;
+import org.apache.pulsar.client.admin.Topics;
+import org.apache.pulsar.client.api.AuthenticationFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Inlong message queue cluster operator.
+ */
+@Service
+public class MessageQueueServiceImpl implements MessageQueueService {
+
+    public static final Logger LOG = LoggerFactory.getLogger(MessageQueueServiceImpl.class);
+
+    public static final Splitter.MapSplitter MAP_SPLITTER = Splitter.on(AttributeConstants.SEPARATOR)
+            .trimResults().withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+    public static final Joiner.MapJoiner MAP_JOINER = Joiner.on(AttributeConstants.SEPARATOR)
+            .withKeyValueSeparator(AttributeConstants.KEY_VALUE_SEPARATOR);
+    public static final String KEY_PRODUCER = "producer";
+    public static final String KEY_CONSUMER = "consumer";
+    public static final String KEY_ADMIN_URL = "adminUrl";
+    public static final String KEY_AUTHENTICATION = "authentication";
+    public static final String KEY_TENANT = "tenant";
+    public static final String KEY_NAMESPACE = "namespace";
+    public static final String KEY_NUM_PARTITIONS = "numPartitions";
+    public static final String KEY_REPLICATION_FACTOR = "replicationFactor";
+    public static final int DEFAULT_NUM_PARTITIONS = 10;
+    public static final short DEFAULT_REPLICATION_FACTOR = 2;
+
+    @Autowired
+    private InlongClusterEntityMapper clusterMapper;
+    @Autowired
+    private InlongGroupEntityMapper groupMapper;
+    @Autowired
+    private TubeMQOperator tubeMQOperator;
+
+    /**
+     * Control produce operation and consume operation of Inlong message queue cluster 
+     */
+    @Override
+    public String control(MessageQueueControlRequest request) {
+        String name = request.getName();
+        // check parameters
+        if (StringUtils.isEmpty(name)) {
+            return "miss cluster name.";
+        }
+        Boolean canProduce = request.getCanProduce();
+        if (canProduce == null) {
+            canProduce = Boolean.FALSE;
+        }
+        Boolean canConsume = request.getCanConsume();
+        if (canConsume == null) {
+            canConsume = Boolean.FALSE;
+        }
+        // find cluster
+        List<InlongClusterEntity> clusters = clusterMapper.selectByKey(null, name, null);
+        if (clusters.size() <= 0) {
+            return "Can not find a cluster by name:" + name;
+        }
+        if (clusters.size() > 1) {
+            return String.format("Cluster:%s data is more than 1.", name);
+        }
+        for (InlongClusterEntity cluster : clusters) {
+            String strExtTag = cluster.getExtTag();
+            strExtTag = StringUtils.trimToEmpty(strExtTag);
+            Map<String, String> extTagMap = new HashMap<>(MAP_SPLITTER.split(strExtTag));
+            extTagMap.put(KEY_PRODUCER, canProduce.toString());
+            extTagMap.put(KEY_CONSUMER, canConsume.toString());
+            String newExtTag = MAP_JOINER.join(extTagMap);
+            cluster.setExtTag(newExtTag);
+            clusterMapper.updateById(cluster);
+        }
+        return null;
+    }
+
+    /**
+     * Build relationships between DataProxy cluster and MessageQueue cluster
+     */
+    public String online(MessageQueueOnlineRequest request) {
+        String mqClusterName = request.getMqClusterName();
+        String proxyClusterName = request.getProxyClusterName();
+        return this.updateExtTag(mqClusterName, proxyClusterName, true);
+    }
+
+    /**
+     * Remove relationships between DataProxy cluster and MessageQueue cluster
+     */
+    public String offline(MessageQueueOfflineRequest request) {
+        String mqClusterName = request.getMqClusterName();
+        String proxyClusterName = request.getProxyClusterName();
+        return this.updateExtTag(mqClusterName, proxyClusterName, false);
+    }
+
+    /**
+     * updateExtTag
+     */
+    private String updateExtTag(String mqClusterName, String proxyClusterName, boolean isAppendTag) {
+        // check parameters
+        if (StringUtils.isEmpty(mqClusterName)) {
+            return "miss message queue cluster name.";
+        }
+        if (StringUtils.isEmpty(proxyClusterName)) {
+            return "miss DataProxy cluster name.";
+        }
+        // find cluster
+        List<InlongClusterEntity> mqClusters = clusterMapper.selectByKey(null, mqClusterName, null);
+        if (mqClusters.size() <= 0) {
+            return "Can not find message queue cluster by name:" + mqClusterName;
+        }
+        if (mqClusters.size() > 1) {
+            return String.format("MessageQueue cluster:%s data is more than 1.", mqClusterName);
+        }
+        List<InlongClusterEntity> proxyClusters = clusterMapper.selectByKey(null, proxyClusterName, null);
+        if (proxyClusters.size() <= 0) {
+            return "Can not find DataProxy cluster by name:" + proxyClusterName;
+        }
+        if (proxyClusters.size() > 1) {
+            return String.format("DataProxy cluster:%s data is more than 1.", proxyClusterName);
+        }
+
+        // parse DataProxy extTag
+        InlongClusterEntity proxyCluster = proxyClusters.get(0);
+        String strProxyExtTag = proxyCluster.getExtTag();
+        strProxyExtTag = StringUtils.trimToEmpty(strProxyExtTag);
+        Map<String, String> proxyExtTagMap = MAP_SPLITTER.split(strProxyExtTag);
+        // parse MessageQueue extTag
+        InlongClusterEntity mqCluster = mqClusters.get(0);
+        String strMqExtTag = mqCluster.getExtTag();
+        strMqExtTag = StringUtils.trimToEmpty(strMqExtTag);
+        Map<String, String> mqExtTagMap = new HashMap<>(MAP_SPLITTER.split(strMqExtTag));
+        // update extTag
+        if (isAppendTag) {
+            // append DataProxy extTag to MessageQueue extTag
+            proxyExtTagMap.forEach((k, v) -> mqExtTagMap.put(k, v));
+        } else {
+            // remove DataProxy extTag from MessageQueue extTag
+            proxyExtTagMap.forEach((k, v) -> mqExtTagMap.remove(k));
+        }
+        // update MessageQueue
+        String newExtTag = MAP_JOINER.join(mqExtTagMap);
+        mqCluster.setExtTag(newExtTag);
+        clusterMapper.updateById(mqCluster);
+        return null;
+    }
+
+    /**
+     * Synchronize all topic from cluster tag to message queue cluster
+     */
+    public String synchronizeTopic(MessageQueueSynchronizeTopicRequest request) {
+        String mqClusterName = request.getName();
+        // check parameters
+        if (StringUtils.isEmpty(mqClusterName)) {
+            return "miss message queue cluster name.";
+        }
+        // find cluster
+        List<InlongClusterEntity> mqClusters = clusterMapper.selectByKey(null, mqClusterName, null);
+        if (mqClusters.size() <= 0) {
+            return "Can not find message queue cluster by name:" + mqClusterName;
+        }
+        if (mqClusters.size() > 1) {
+            return String.format("MessageQueue cluster:%s data is more than 1.", mqClusterName);
+        }
+        // find cluster tag
+        InlongClusterEntity mqCluster = mqClusters.get(0);
+        String clusterTag = mqCluster.getClusterTags();
+        if (StringUtils.isEmpty(clusterTag)) {
+            return String.format("Cluster tag of message queue cluster:%s is null.", mqClusterName);
+        }
+        // find group entities
+        InlongGroupPageRequest groupRequest = new InlongGroupPageRequest();
+        groupRequest.setIsAdminRole(true);
+        groupRequest.setClusterTagList(Collections.singletonList(clusterTag));
+        List<InlongGroupEntity> groupEntities = groupMapper.selectByCondition(groupRequest);
+        String mqType = mqCluster.getType();
+        if (StringUtils.equalsIgnoreCase(mqType, MQType.PULSAR)) {
+            return this.createPulsarTopic(mqCluster, groupEntities);
+        } else if (StringUtils.equalsIgnoreCase(mqType, MQType.KAFKA)) {
+            return this.createKafkaTopic(mqCluster, groupEntities);
+        } else if (StringUtils.equalsAnyIgnoreCase(mqType, MQType.TUBEMQ)) {
+            return this.createTubeTopic(mqCluster, groupEntities);
+        } else {
+            return String.format("Unknown message queue type:%s", mqType);
+        }
+    }
+
+    /**
+     * createPulsarTopic
+     */
+    private String createPulsarTopic(InlongClusterEntity cluster, List<InlongGroupEntity> groupInfos) {

Review Comment:
   It is different, ClusterTagService create a topic for every MessageQueue cluster.
   MessageQueueService create all topic of this cluster tag.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@inlong.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org