You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/07/09 08:50:29 UTC

[incubator-inlong] branch master updated: [INLONG-694] Add retry mechanism for creating tube consumer group

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 348b912  [INLONG-694] Add retry mechanism for creating tube consumer group
348b912 is described below

commit 348b912587fcc3c08ddc2d8bfbff230c1b79d170
Author: ireliasheng <ir...@tencent.com>
AuthorDate: Fri Jul 9 15:42:05 2021 +0800

    [INLONG-694] Add retry mechanism for creating tube consumer group
---
 .../inlong/manager/common/beans/TryBean.java       |  44 ++++++++
 .../CreateTubeConsumeGroupTaskEventListener.java   | 121 +++++++++++++++++++++
 .../service/thirdpart/mq/TubeMqOptService.java     |  49 ++++++---
 3 files changed, 197 insertions(+), 17 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java
new file mode 100644
index 0000000..d9606c4
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.beans;
+
+import lombok.Data;
+import org.springframework.stereotype.Component;
+
+@Data
+@Component
+public class TryBean {
+
+    /**
+     * Maximum number of attempts
+     */
+    private Integer maxAttempts = 3;
+    /**
+     * The first delay time, in milliseconds
+     */
+    private Long delay = 3000L;
+    /**
+     * The max delay time, in milliseconds
+     */
+    private Long maxDelay = 300000L;
+    /**
+     * Delay time increase factor
+     */
+    private Integer multiplier = 2;
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java
new file mode 100644
index 0000000..31af6ee
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.TryBean;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
+import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.GroupNameJsonSetBean;
+import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
+import org.apache.inlong.manager.dao.mapper.ClusterInfoMapper;
+import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.workflow.core.event.ListenerResult;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
+import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class CreateTubeConsumeGroupTaskEventListener implements TaskEventListener {
+
+    @Autowired
+    BusinessService businessService;
+
+    @Autowired
+    ClusterInfoMapper clusterInfoMapper;
+
+    @Autowired
+    TubeMqOptService tubeMqOptService;
+
+    @Value("${cluster.tube.clusterId}")
+    Integer clusterId;
+
+    @Autowired
+    TryBean tryBean;
+
+    @Override
+    public TaskEvent event() {
+        return TaskEvent.COMPLETE;
+    }
+
+
+    @Override
+    public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+        CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+        String bid = form.getBusinessId();
+
+        log.info("try to create consumer group for bid {}", bid);
+
+        BusinessInfo businessInfo = businessService.get(bid);
+
+        String topicName = businessInfo.getMqResourceObj();
+
+        QueryTubeTopicRequest queryTubeTopicRequest = QueryTubeTopicRequest.builder()
+                .topicName(topicName).clusterId(clusterId)
+                .user(businessInfo.getCreator()).build();
+        // Query whether the tube topic exists
+        boolean topicExist = tubeMqOptService.queryTopicIsExist(queryTubeTopicRequest);
+
+        Integer tryNumber = tryBean.getMaxAttempts();
+        Long delay = tryBean.getDelay();
+        while (--tryNumber > 0) {
+            if (topicExist) {
+                break;
+            }
+
+            try {
+                Thread.sleep(delay);
+                delay *= tryBean.getMultiplier();
+                topicExist = tubeMqOptService.queryTopicIsExist(queryTubeTopicRequest);
+            } catch (InterruptedException e) {
+                log.error("Try to determine whether the tube topic exists {}", e.getMessage());
+            }
+
+        }
+        log.info("Try to determine whether the tube topic exists ,try number is {}", tryNumber);
+        AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
+        addTubeConsumeGroupRequest.setClusterId(clusterId);
+        addTubeConsumeGroupRequest.setCreateUser(businessInfo.getCreator());
+
+        GroupNameJsonSetBean groupNameJsonSetBean = new GroupNameJsonSetBean();
+        groupNameJsonSetBean.setTopicName(topicName);
+        String consumeGroupName = "sort_" + topicName + "_consumer_group";
+        groupNameJsonSetBean.setGroupName(consumeGroupName);
+        addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(groupNameJsonSetBean));
+
+        try {
+            tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
+        } catch (Exception e) {
+            log.error("create tube consume group for bid={} error {}", bid, e.getMessage(), e);
+        }
+        log.info("finish to create consumer group for {}", bid);
+        return ListenerResult.success();
+    }
+
+    @Override
+    public boolean async() {
+        return false;
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeMqOptService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeMqOptService.java
index cfd3da8..4eae2f0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeMqOptService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeMqOptService.java
@@ -56,20 +56,21 @@ public class TubeMqOptService {
             }
             AddTubeMqTopicRequest.AddTopicTasksBean addTopicTasksBean = request.getAddTopicTasks().get(0);
             QueryTubeTopicRequest topicRequest = QueryTubeTopicRequest.builder()
-                    .topicName(addTopicTasksBean.getTopicName())
-                    .clusterId(clusterBean.getClusterId())
-                    .user(request.getUser())
-                    .build();
+                    .topicName(addTopicTasksBean.getTopicName()).clusterId(clusterBean.getClusterId())
+                    .user(request.getUser()).build();
 
             String tubeManager = clusterBean.getTubeManager();
-            TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/topic?method=queryCanWrite",
-                    HttpMethod.POST, GSON.toJson(topicRequest), httpHeaders, TubeManagerResponse.class);
+            TubeManagerResponse response = httpUtils
+                    .request(tubeManager + "/v1/topic?method=queryCanWrite", HttpMethod.POST,
+                            GSON.toJson(topicRequest), httpHeaders, TubeManagerResponse.class);
             if (response.getErrCode() == 101) { // topic already exists
-                log.info("create tube topic {} on {} ", GSON.toJson(request),
+                log.info(" create tube topic  {}  on {} ", GSON.toJson(request),
                         tubeManager + "/v1/task?method=addTopicTask");
+
                 request.setClusterId(clusterBean.getClusterId());
-                httpUtils.request(tubeManager + "/v1/task?method=addTopicTask", HttpMethod.POST,
-                        GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
+                TubeManagerResponse createRsp = httpUtils
+                        .request(tubeManager + "/v1/task?method=addTopicTask", HttpMethod.POST,
+                                GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
             } else {
                 log.warn("topic {} exists in {} ", addTopicTasksBean.getTopicName(), tubeManager);
             }
@@ -83,12 +84,10 @@ public class TubeMqOptService {
         HttpHeaders httpHeaders = new HttpHeaders();
         httpHeaders.add("Content-Type", "application/json");
         try {
-            log.info(" create tube consumer group  {}  on {} ", GSON.toJson(request),
+            log.info("create tube consumer group {}  on {} ", GSON.toJson(request),
                     clusterBean.getTubeManager() + "/v1/task?method=addTopicTask");
-            TubeManagerResponse rsp = httpUtils
-                    .request(clusterBean.getTubeManager() + "/v1/group?method=add", HttpMethod.POST,
-                            GSON.toJson(request),
-                            httpHeaders, TubeManagerResponse.class);
+            TubeManagerResponse rsp = httpUtils.request(clusterBean.getTubeManager() + "/v1/group?method=add",
+                    HttpMethod.POST, GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
             if (rsp.getErrCode() == -1) { // Creation failed
                 throw new BusinessException(BizErrorCodeEnum.CONSUMER_GROUP_CREATE_FAILED, rsp.getErrMsg());
             }
@@ -103,13 +102,29 @@ public class TubeMqOptService {
         HttpHeaders httpHeaders = new HttpHeaders();
         try {
             log.info(" query tube  cluster {} ", clusterBean.getTubeManager() + "/v1/cluster");
-            TubeClusterResponse rsp = httpUtils
-                    .request(clusterBean.getTubeManager() + "/v1/cluster", HttpMethod.GET, null, httpHeaders,
-                            TubeClusterResponse.class);
+            TubeClusterResponse rsp = httpUtils.request(clusterBean.getTubeManager() + "/v1/cluster",
+                    HttpMethod.GET, null, httpHeaders, TubeClusterResponse.class);
             return rsp.getData();
         } catch (Exception e) {
             log.error(" fail to  query tube  cluster ", e);
         }
         return null;
     }
+
+    public boolean queryTopicIsExist(QueryTubeTopicRequest queryTubeTopicRequest) {
+        HttpHeaders httpHeaders = new HttpHeaders();
+        httpHeaders.add("Content-Type", "application/json");
+        try {
+            String tubeManager = clusterBean.getTubeManager();
+            TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/topic?method=queryCanWrite",
+                    HttpMethod.POST, GSON.toJson(queryTubeTopicRequest), httpHeaders, TubeManagerResponse.class);
+            if (response.getErrCode() == 0) { // topic already exists
+                log.error("topic {} exists in {} ", queryTubeTopicRequest.getTopicName(), tubeManager);
+                return true;
+            }
+        } catch (Exception e) {
+            log.error("fail to query tube topic {}", queryTubeTopicRequest.getTopicName(), e);
+        }
+        return false;
+    }
 }