You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2021/07/09 08:50:29 UTC
[incubator-inlong] branch master updated: [INLONG-694] Add retry
mechanism for creating tube consumer group
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 348b912 [INLONG-694] Add retry mechanism for creating tube consumer group
348b912 is described below
commit 348b912587fcc3c08ddc2d8bfbff230c1b79d170
Author: ireliasheng <ir...@tencent.com>
AuthorDate: Fri Jul 9 15:42:05 2021 +0800
[INLONG-694] Add retry mechanism for creating tube consumer group
---
.../inlong/manager/common/beans/TryBean.java | 44 ++++++++
.../CreateTubeConsumeGroupTaskEventListener.java | 121 +++++++++++++++++++++
.../service/thirdpart/mq/TubeMqOptService.java | 49 ++++++---
3 files changed, 197 insertions(+), 17 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java
new file mode 100644
index 0000000..d9606c4
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/beans/TryBean.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.beans;
+
+import lombok.Data;
+import org.springframework.stereotype.Component;
+
+@Data
+@Component
+public class TryBean {
+
+ /**
+ * Maximum number of attempts
+ */
+ private Integer maxAttempts = 3;
+ /**
+ * The first delay time, in milliseconds
+ */
+ private Long delay = 3000L;
+ /**
+ * The max delay time, in milliseconds
+ */
+ private Long maxDelay = 300000L;
+ /**
+ * Delay time increase factor
+ */
+ private Integer multiplier = 2;
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java
new file mode 100644
index 0000000..31af6ee
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/CreateTubeConsumeGroupTaskEventListener.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.thirdpart.mq;
+
+import java.util.Collections;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.beans.TryBean;
+import org.apache.inlong.manager.common.pojo.business.BusinessInfo;
+import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
+import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.GroupNameJsonSetBean;
+import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
+import org.apache.inlong.manager.dao.mapper.ClusterInfoMapper;
+import org.apache.inlong.manager.service.core.BusinessService;
+import org.apache.inlong.manager.service.workflow.newbusiness.CreateResourceWorkflowForm;
+import org.apache.inlong.manager.workflow.core.event.ListenerResult;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEvent;
+import org.apache.inlong.manager.workflow.core.event.task.TaskEventListener;
+import org.apache.inlong.manager.workflow.exception.WorkflowListenerException;
+import org.apache.inlong.manager.workflow.model.WorkflowContext;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+@Slf4j
+public class CreateTubeConsumeGroupTaskEventListener implements TaskEventListener {
+
+ @Autowired
+ BusinessService businessService;
+
+ @Autowired
+ ClusterInfoMapper clusterInfoMapper;
+
+ @Autowired
+ TubeMqOptService tubeMqOptService;
+
+ @Value("${cluster.tube.clusterId}")
+ Integer clusterId;
+
+ @Autowired
+ TryBean tryBean;
+
+ @Override
+ public TaskEvent event() {
+ return TaskEvent.COMPLETE;
+ }
+
+
+ @Override
+ public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
+ CreateResourceWorkflowForm form = (CreateResourceWorkflowForm) context.getProcessForm();
+ String bid = form.getBusinessId();
+
+ log.info("try to create consumer group for bid {}", bid);
+
+ BusinessInfo businessInfo = businessService.get(bid);
+
+ String topicName = businessInfo.getMqResourceObj();
+
+ QueryTubeTopicRequest queryTubeTopicRequest = QueryTubeTopicRequest.builder()
+ .topicName(topicName).clusterId(clusterId)
+ .user(businessInfo.getCreator()).build();
+ // Query whether the tube topic exists
+ boolean topicExist = tubeMqOptService.queryTopicIsExist(queryTubeTopicRequest);
+
+ Integer tryNumber = tryBean.getMaxAttempts();
+ Long delay = tryBean.getDelay();
+ while (--tryNumber > 0) {
+ if (topicExist) {
+ break;
+ }
+
+ try {
+ Thread.sleep(delay);
+ delay *= tryBean.getMultiplier();
+ topicExist = tubeMqOptService.queryTopicIsExist(queryTubeTopicRequest);
+ } catch (InterruptedException e) {
+ log.error("Try to determine whether the tube topic exists {}", e.getMessage());
+ }
+
+ }
+ log.info("Try to determine whether the tube topic exists ,try number is {}", tryNumber);
+ AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
+ addTubeConsumeGroupRequest.setClusterId(clusterId);
+ addTubeConsumeGroupRequest.setCreateUser(businessInfo.getCreator());
+
+ GroupNameJsonSetBean groupNameJsonSetBean = new GroupNameJsonSetBean();
+ groupNameJsonSetBean.setTopicName(topicName);
+ String consumeGroupName = "sort_" + topicName + "_consumer_group";
+ groupNameJsonSetBean.setGroupName(consumeGroupName);
+ addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(groupNameJsonSetBean));
+
+ try {
+ tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
+ } catch (Exception e) {
+ log.error("create tube consume group for bid={} error {}", bid, e.getMessage(), e);
+ }
+ log.info("finish to create consumer group for {}", bid);
+ return ListenerResult.success();
+ }
+
+ @Override
+ public boolean async() {
+ return false;
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeMqOptService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeMqOptService.java
index cfd3da8..4eae2f0 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeMqOptService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdpart/mq/TubeMqOptService.java
@@ -56,20 +56,21 @@ public class TubeMqOptService {
}
AddTubeMqTopicRequest.AddTopicTasksBean addTopicTasksBean = request.getAddTopicTasks().get(0);
QueryTubeTopicRequest topicRequest = QueryTubeTopicRequest.builder()
- .topicName(addTopicTasksBean.getTopicName())
- .clusterId(clusterBean.getClusterId())
- .user(request.getUser())
- .build();
+ .topicName(addTopicTasksBean.getTopicName()).clusterId(clusterBean.getClusterId())
+ .user(request.getUser()).build();
String tubeManager = clusterBean.getTubeManager();
- TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/topic?method=queryCanWrite",
- HttpMethod.POST, GSON.toJson(topicRequest), httpHeaders, TubeManagerResponse.class);
+ TubeManagerResponse response = httpUtils
+ .request(tubeManager + "/v1/topic?method=queryCanWrite", HttpMethod.POST,
+ GSON.toJson(topicRequest), httpHeaders, TubeManagerResponse.class);
if (response.getErrCode() == 101) { // topic already exists
- log.info("create tube topic {} on {} ", GSON.toJson(request),
+ log.info(" create tube topic {} on {} ", GSON.toJson(request),
tubeManager + "/v1/task?method=addTopicTask");
+
request.setClusterId(clusterBean.getClusterId());
- httpUtils.request(tubeManager + "/v1/task?method=addTopicTask", HttpMethod.POST,
- GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
+ TubeManagerResponse createRsp = httpUtils
+ .request(tubeManager + "/v1/task?method=addTopicTask", HttpMethod.POST,
+ GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
} else {
log.warn("topic {} exists in {} ", addTopicTasksBean.getTopicName(), tubeManager);
}
@@ -83,12 +84,10 @@ public class TubeMqOptService {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add("Content-Type", "application/json");
try {
- log.info(" create tube consumer group {} on {} ", GSON.toJson(request),
+ log.info("create tube consumer group {} on {} ", GSON.toJson(request),
clusterBean.getTubeManager() + "/v1/task?method=addTopicTask");
- TubeManagerResponse rsp = httpUtils
- .request(clusterBean.getTubeManager() + "/v1/group?method=add", HttpMethod.POST,
- GSON.toJson(request),
- httpHeaders, TubeManagerResponse.class);
+ TubeManagerResponse rsp = httpUtils.request(clusterBean.getTubeManager() + "/v1/group?method=add",
+ HttpMethod.POST, GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
if (rsp.getErrCode() == -1) { // Creation failed
throw new BusinessException(BizErrorCodeEnum.CONSUMER_GROUP_CREATE_FAILED, rsp.getErrMsg());
}
@@ -103,13 +102,29 @@ public class TubeMqOptService {
HttpHeaders httpHeaders = new HttpHeaders();
try {
log.info(" query tube cluster {} ", clusterBean.getTubeManager() + "/v1/cluster");
- TubeClusterResponse rsp = httpUtils
- .request(clusterBean.getTubeManager() + "/v1/cluster", HttpMethod.GET, null, httpHeaders,
- TubeClusterResponse.class);
+ TubeClusterResponse rsp = httpUtils.request(clusterBean.getTubeManager() + "/v1/cluster",
+ HttpMethod.GET, null, httpHeaders, TubeClusterResponse.class);
return rsp.getData();
} catch (Exception e) {
log.error(" fail to query tube cluster ", e);
}
return null;
}
+
+ public boolean queryTopicIsExist(QueryTubeTopicRequest queryTubeTopicRequest) {
+ HttpHeaders httpHeaders = new HttpHeaders();
+ httpHeaders.add("Content-Type", "application/json");
+ try {
+ String tubeManager = clusterBean.getTubeManager();
+ TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/topic?method=queryCanWrite",
+ HttpMethod.POST, GSON.toJson(queryTubeTopicRequest), httpHeaders, TubeManagerResponse.class);
+ if (response.getErrCode() == 0) { // topic already exists
+ log.error("topic {} exists in {} ", queryTubeTopicRequest.getTopicName(), tubeManager);
+ return true;
+ }
+ } catch (Exception e) {
+ log.error("fail to query tube topic {}", queryTubeTopicRequest.getTopicName(), e);
+ }
+ return false;
+ }
}