You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/07/03 06:54:00 UTC
[inlong] 02/03: [INLONG-4843][Manager] Add RPC URL for TubeMQ cluster (#4844)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit f670f098d5dda1600c2fabca5b33f2b7aceffd99
Author: ganfengtan <Ga...@users.noreply.github.com>
AuthorDate: Sun Jul 3 14:18:28 2022 +0800
[INLONG-4843][Manager] Add RPC URL for TubeMQ cluster (#4844)
---
.../common/pojo/cluster/tube/TubeClusterDTO.java | 61 ++++++++++++++++++++++
.../common/pojo/cluster/tube/TubeClusterInfo.java | 4 +-
.../pojo/cluster/tube/TubeClusterRequest.java | 4 ++
.../service/cluster/TubeClusterOperator.java | 28 +++++++++-
.../manager/service/mq/util/TubeMQOperator.java | 4 +-
5 files changed, 96 insertions(+), 5 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java
new file mode 100644
index 000000000..b3bde1042
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterDTO.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.inlong.manager.common.pojo.cluster.tube;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Tube cluster info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Tube cluster info")
+public class TubeClusterDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
+
+ @ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080")
+ private String masterWebUrl;
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static TubeClusterDTO getFromJson(@NotNull String extParams) {
+ try {
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return OBJECT_MAPPER.readValue(extParams, TubeClusterDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterInfo.java
index 968a0de54..f32e89040 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterInfo.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.common.pojo.cluster.tube;
import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -36,7 +37,8 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@ApiModel("Inlong cluster info for Tube")
public class TubeClusterInfo extends ClusterInfo {
- // no fields
+ @ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080")
+ private String masterWebUrl;
public TubeClusterInfo() {
this.setType(ClusterType.TUBE);
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterRequest.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterRequest.java
index 613905519..f8b3002d1 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterRequest.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/cluster/tube/TubeClusterRequest.java
@@ -18,6 +18,7 @@
package org.apache.inlong.manager.common.pojo.cluster.tube;
import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
@@ -35,6 +36,9 @@ import org.apache.inlong.manager.common.util.JsonTypeDefine;
@ApiModel("Inlong cluster request for Tube")
public class TubeClusterRequest extends ClusterRequest {
+ @ApiModelProperty(value = "Master Web URL http://120.0.0.1:8080")
+ private String masterWebUrl;
+
// no field
public TubeClusterRequest() {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
index 2a7e01c48..cb6eed3bc 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/TubeClusterOperator.java
@@ -17,17 +17,22 @@
package org.apache.inlong.manager.service.cluster;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterDTO;
import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterRequest;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.service.group.InlongNoneMqOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
@@ -38,6 +43,9 @@ public class TubeClusterOperator extends AbstractClusterOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(InlongNoneMqOperator.class);
+ @Autowired
+ private ObjectMapper objectMapper;
+
@Override
public Boolean accept(String clusterType) {
return getClusterType().equals(clusterType);
@@ -50,7 +58,15 @@ public class TubeClusterOperator extends AbstractClusterOperator {
@Override
protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) {
- LOGGER.info("do nothing for tube cluster in set target entity");
+ TubeClusterRequest tubeRequest = (TubeClusterRequest) request;
+ CommonBeanUtils.copyProperties(tubeRequest, targetEntity, true);
+ try {
+ TubeClusterDTO dto = objectMapper.convertValue(tubeRequest, TubeClusterDTO.class);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ LOGGER.info("success to set entity for tube cluster");
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
}
@Override
@@ -58,7 +74,15 @@ public class TubeClusterOperator extends AbstractClusterOperator {
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
}
- return CommonBeanUtils.copyProperties(entity, TubeClusterInfo::new);
+ TubeClusterInfo tubeClusterInfo = new TubeClusterInfo();
+ CommonBeanUtils.copyProperties(entity, tubeClusterInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ TubeClusterDTO dto = TubeClusterDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, tubeClusterInfo);
+ }
+
+ LOGGER.info("success to get tube cluster info from entity");
+ return tubeClusterInfo;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java
index 0f68ba995..1fa9e489a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java
@@ -69,7 +69,7 @@ public class TubeMQOperator {
* Create topic for the given tube cluster.
*/
public void createTopic(@Nonnull TubeClusterInfo tubeCluster, String topicName, String operator) {
- String masterUrl = tubeCluster.getUrl();
+ String masterUrl = tubeCluster.getMasterWebUrl();
LOGGER.info("begin to create tube topic {} in master {}", topicName, masterUrl);
if (StringUtils.isEmpty(masterUrl) || StringUtils.isEmpty(topicName)) {
throw new BusinessException("tube master url or tube topic cannot be null");
@@ -88,7 +88,7 @@ public class TubeMQOperator {
* Create consumer group for the given tube topic and cluster.
*/
public void createConsumerGroup(TubeClusterInfo tubeCluster, String topic, String consumerGroup, String operator) {
- String masterUrl = tubeCluster.getUrl();
+ String masterUrl = tubeCluster.getMasterWebUrl();
LOGGER.info("begin to create consumer group {} for topic {} in master {}", consumerGroup, topic, masterUrl);
if (StringUtils.isEmpty(masterUrl) || StringUtils.isEmpty(consumerGroup) || StringUtils.isEmpty(topic)) {
throw new BusinessException("tube master url, consumer group, or tube topic cannot be null");