You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/08/17 06:25:12 UTC
[inlong] branch master updated: [INLONG-5380][Manager] Modify the saving function of the data node (#5381)
This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new dd3cf6ecf [INLONG-5380][Manager] Modify the saving function of the data node (#5381)
dd3cf6ecf is described below
commit dd3cf6ecff4f655e629a6bd2d6ea2e56f385d3cb
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Wed Aug 17 14:25:07 2022 +0800
[INLONG-5380][Manager] Modify the saving function of the data node (#5381)
---
.../client/api/inner/ClientFactoryTest.java | 17 ++--
.../inlong/manager/common/enums/ErrorCodeEnum.java | 3 +
.../inlong/manager/common/util/HttpUtils.java | 26 ++++--
.../{DataNodeRequest.java => DataNodeInfo.java} | 44 +++++-----
.../manager/pojo/node/DataNodePageRequest.java | 4 +-
.../inlong/manager/pojo/node/DataNodeRequest.java | 16 ++--
.../manager/pojo/node/hive/HiveDataNodeDTO.java | 94 ++++++++++++++++++++++
.../manager/pojo/node/hive/HiveDataNodeInfo.java | 66 +++++++++++++++
.../pojo/node/hive/HiveDataNodeRequest.java | 61 ++++++++++++++
.../service/node/AbstractDataNodeOperator.java | 78 ++++++++++++++++++
.../manager/service/node/DataNodeOperator.java | 65 +++++++++++++++
.../service/node/DataNodeOperatorFactory.java | 46 +++++++++++
.../service/{core => node}/DataNodeService.java | 8 +-
.../{core/impl => node}/DataNodeServiceImpl.java | 50 ++++++------
.../service/node/hive/HiveDataNodeOperator.java | 87 ++++++++++++++++++++
.../manager/service/sink/AbstractSinkOperator.java | 21 ++++-
.../manager/service/sink/StreamSinkOperator.java | 7 ++
.../service/sink/StreamSinkServiceImpl.java | 33 +++-----
.../service/core/impl/DataNodeServiceTest.java | 29 ++++---
.../manager/web/controller/DataNodeController.java | 10 +--
.../web/controller/DataNodeControllerTest.java | 36 +++++----
21 files changed, 667 insertions(+), 134 deletions(-)
diff --git a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
index 203fd10ae..26910e33b 100644
--- a/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
+++ b/inlong-manager/manager-client/src/test/java/org/apache/inlong/manager/client/api/inner/ClientFactoryTest.java
@@ -62,8 +62,8 @@ import org.apache.inlong.manager.pojo.group.InlongGroupResetRequest;
import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
import org.apache.inlong.manager.pojo.node.DataNodeResponse;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.pojo.sink.ck.ClickHouseSink;
import org.apache.inlong.manager.pojo.sink.es.ElasticsearchSink;
@@ -965,9 +965,8 @@ class ClientFactoryTest {
Response.success(1))
))
);
- DataNodeRequest request = new DataNodeRequest();
- request.setName("test_node");
- request.setType(DataNodeType.HIVE);
+ HiveDataNodeRequest request = new HiveDataNodeRequest();
+ request.setName("test_hive_node");
Integer nodeId = dataNodeClient.save(request);
Assertions.assertEquals(1, nodeId);
}
@@ -1007,9 +1006,8 @@ class ClientFactoryTest {
)
);
- DataNodeRequest request = new DataNodeRequest();
- request.setName("test_node");
- request.setToken(DataNodeType.HIVE);
+ HiveDataNodeRequest request = new HiveDataNodeRequest();
+ request.setName("test_hive_node");
PageInfo<DataNodeResponse> nodePageInfo = dataNodeClient.list(request);
Assertions.assertEquals(JsonUtils.toJsonString(nodePageInfo.getList()), JsonUtils.toJsonString(nodeResponses));
}
@@ -1025,10 +1023,9 @@ class ClientFactoryTest {
)
);
- DataNodeRequest request = new DataNodeRequest();
+ HiveDataNodeRequest request = new HiveDataNodeRequest();
request.setId(1);
- request.setName("test_node");
- request.setType(DataNodeType.HIVE);
+ request.setName("test_hive_node");
Boolean isUpdate = dataNodeClient.update(request);
Assertions.assertTrue(isUpdate);
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
index ce24b12ee..d6cc20776 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ErrorCodeEnum.java
@@ -60,6 +60,9 @@ public enum ErrorCodeEnum {
CLUSTER_TYPE_NOT_SUPPORTED(1102, "Cluster type '%s' not supported"),
CLUSTER_INFO_INCORRECT(1103, "Cluster info was incorrect"),
+ DATA_NODE_NOT_FOUND(1150, "Data node information does not exist"),
+ DATA_NODE_TYPE_NOT_SUPPORTED(1151, "Data node type '%s' not supported"),
+
STREAM_NOT_FOUND(1201, "Inlong stream does not exist/no operation permission"),
STREAM_ID_DUPLICATE(1202, "The current inlong group has a inlong stream with the same ID"),
STREAM_OPT_NOT_ALLOWED(1203,
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
index 13b6bbad8..09c72464a 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/util/HttpUtils.java
@@ -98,7 +98,7 @@ public class HttpUtils {
/**
* Send an HTTP request
*/
- public <T> T request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody,
+ public static <T> T request(RestTemplate restTemplate, String url, HttpMethod httpMethod, Object requestBody,
HttpHeaders header, ParameterizedTypeReference<T> typeReference) {
if (log.isDebugEnabled()) {
log.debug("begin request to {} by request body {}", url, GSON.toJson(requestBody));
@@ -112,17 +112,31 @@ public class HttpUtils {
return response.getBody();
}
- public <T> T postRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header,
+ /**
+ * Send GET request to the specified URL.
+ */
+ public static <T> T getRequest(RestTemplate restTemplate, String url, Map<String, Object> params,
+ HttpHeaders header, ParameterizedTypeReference<T> typeReference) {
+ return request(restTemplate, buildUrlWithQueryParam(url, params), HttpMethod.GET, null, header, typeReference);
+ }
+
+ /**
+ * Send PUT request to the specified URL.
+ */
+ public static <T> T putRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header,
ParameterizedTypeReference<T> typeReference) {
- return request(restTemplate, url, HttpMethod.POST, params, header, typeReference);
+ return request(restTemplate, url, HttpMethod.PUT, params, header, typeReference);
}
- public <T> T getRequest(RestTemplate restTemplate, String url, Map<String, Object> params, HttpHeaders header,
+ /**
+ * Send POST request to the specified URL.
+ */
+ public static <T> T postRequest(RestTemplate restTemplate, String url, Object params, HttpHeaders header,
ParameterizedTypeReference<T> typeReference) {
- return request(restTemplate, buildUrlWithQueryParam(url, params), HttpMethod.GET, null, header, typeReference);
+ return request(restTemplate, url, HttpMethod.POST, params, header, typeReference);
}
- private String buildUrlWithQueryParam(String url, Map<String, Object> params) {
+ private static String buildUrlWithQueryParam(String url, Map<String, Object> params) {
if (params == null) {
return url;
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeInfo.java
similarity index 60%
copy from inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeInfo.java
index 3db00c792..d17569e14 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeInfo.java
@@ -17,46 +17,42 @@
package org.apache.inlong.manager.pojo.node;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.validation.UpdateValidation;
-import javax.validation.constraints.NotBlank;
-import javax.validation.constraints.NotNull;
+import java.util.Date;
/**
- * Data node request
+ * Data node info
*/
@Data
-@Builder
@NoArgsConstructor
@AllArgsConstructor
-@ApiModel("Data node request")
-public class DataNodeRequest {
+@ApiModel("Data node info")
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
+public abstract class DataNodeInfo {
- @NotNull(groups = UpdateValidation.class)
@ApiModelProperty(value = "Primary key")
private Integer id;
- @NotBlank(message = "node name cannot be blank")
- @ApiModelProperty(value = "Node name")
+ @ApiModelProperty(value = "Data node name")
private String name;
- @NotBlank(message = "node type cannot be blank")
- @ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.")
+ @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
private String type;
- @ApiModelProperty(value = "Node url")
+ @ApiModelProperty(value = "Data node URL")
private String url;
- @ApiModelProperty(value = "Node username")
+ @ApiModelProperty("Data node username")
private String username;
- @ApiModelProperty(value = "Node token if needed")
+ @ApiModelProperty(value = "Data node token if needed")
private String token;
@ApiModelProperty(value = "Extended params")
@@ -65,11 +61,23 @@ public class DataNodeRequest {
@ApiModelProperty(value = "Description of the data node")
private String description;
- @NotBlank(message = "inCharges cannot be blank")
- @ApiModelProperty(value = "Name of responsible person, separated by commas", required = true)
+ @ApiModelProperty(value = "Name of in charges, separated by commas")
private String inCharges;
+ @ApiModelProperty(value = "Name of in creator")
+ private String creator;
+
+ @ApiModelProperty(value = "Name of in modifier")
+ private String modifier;
+
@ApiModelProperty(value = "Version number")
private Integer version;
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date createTime;
+
+ @JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")
+ private Date modifyTime;
+
+ public abstract DataNodeRequest genRequest();
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java
index 6f3862776..496aa9c9c 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodePageRequest.java
@@ -31,10 +31,10 @@ import org.apache.inlong.manager.pojo.common.PageRequest;
@ApiModel("Data node paging query request")
public class DataNodePageRequest extends PageRequest {
- @ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.")
+ @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
private String type;
- @ApiModelProperty(value = "Node name")
+ @ApiModelProperty(value = "Data node name")
private String name;
@ApiModelProperty(value = "Keywords, name, url, etc.")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
index 3db00c792..54504e2dc 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/DataNodeRequest.java
@@ -17,10 +17,10 @@
package org.apache.inlong.manager.pojo.node;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.common.validation.UpdateValidation;
@@ -32,31 +32,31 @@ import javax.validation.constraints.NotNull;
* Data node request
*/
@Data
-@Builder
@NoArgsConstructor
@AllArgsConstructor
@ApiModel("Data node request")
-public class DataNodeRequest {
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, visible = true, property = "type")
+public abstract class DataNodeRequest {
@NotNull(groups = UpdateValidation.class)
@ApiModelProperty(value = "Primary key")
private Integer id;
@NotBlank(message = "node name cannot be blank")
- @ApiModelProperty(value = "Node name")
+ @ApiModelProperty(value = "Data node name")
private String name;
@NotBlank(message = "node type cannot be blank")
- @ApiModelProperty(value = "Node type, including MYSQL, HIVE, KAFKA, ES, etc.")
+ @ApiModelProperty(value = "Data node type, including MYSQL, HIVE, KAFKA, ES, etc.")
private String type;
- @ApiModelProperty(value = "Node url")
+ @ApiModelProperty(value = "Data node URL")
private String url;
- @ApiModelProperty(value = "Node username")
+ @ApiModelProperty(value = "Data node username")
private String username;
- @ApiModelProperty(value = "Node token if needed")
+ @ApiModelProperty(value = "Data node token if needed")
private String token;
@ApiModelProperty(value = "Extended params")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
new file mode 100644
index 000000000..5922e61e4
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeDTO.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hive;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Hive data node info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@ApiModel("Hive data node info")
+public class HiveDataNodeDTO {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HiveDataNodeDTO.class);
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // thread safe
+
+ @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Version for Hive, such as: 3.2.1")
+ private String hiveVersion;
+
+ @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml")
+ private String hiveConfDir;
+
+ @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
+ private String hdfsPath;
+
+ @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+ private String warehouse;
+
+ @ApiModelProperty("User and group information for writing data to HDFS")
+ private String hdfsUgi;
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static HiveDataNodeDTO getFromRequest(HiveDataNodeRequest request) throws Exception {
+ return HiveDataNodeDTO.builder()
+ .jdbcUrl(request.getJdbcUrl())
+ .hiveVersion(request.getHiveVersion())
+ .hiveConfDir(request.getHiveConfDir())
+ .hdfsPath(request.getHdfsPath())
+ .warehouse(request.getWarehouse())
+ .hdfsUgi(request.getHdfsUgi())
+ .build();
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static HiveDataNodeDTO getFromJson(@NotNull String extParams) {
+ try {
+ OBJECT_MAPPER.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ return OBJECT_MAPPER.readValue(extParams, HiveDataNodeDTO.class);
+ } catch (Exception e) {
+ LOGGER.error("Failed to extract additional parameters for hive data node: ", e);
+ throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
new file mode 100644
index 000000000..34b50ed06
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hive;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+
+/**
+ * Hive data node info
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HIVE)
+@ApiModel("Hive data node info")
+public class HiveDataNodeInfo extends DataNodeInfo {
+
+ @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Version for Hive, such as: 3.2.1")
+ private String hiveVersion;
+
+ @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml")
+ private String hiveConfDir;
+
+ @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
+ private String hdfsPath;
+
+ @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+ private String warehouse;
+
+ @ApiModelProperty("User and group information for writing data to HDFS")
+ private String hdfsUgi;
+
+ public HiveDataNodeInfo() {
+ this.setType(DataNodeType.HIVE);
+ }
+
+ @Override
+ public HiveDataNodeRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this, HiveDataNodeRequest::new);
+ }
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
new file mode 100644
index 000000000..ab51ed666
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeRequest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.node.hive;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * Hive data node request
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = DataNodeType.HIVE)
+@ApiModel("Hive data node request")
+public class HiveDataNodeRequest extends DataNodeRequest {
+
+ @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
+ private String jdbcUrl;
+
+ @ApiModelProperty("Version for Hive, such as: 3.2.1")
+ private String hiveVersion;
+
+ @ApiModelProperty("Config directory of Hive on HDFS, needed by sort in light mode, must include hive-site.xml")
+ private String hiveConfDir;
+
+ @ApiModelProperty("HDFS default FS, such as: hdfs://127.0.0.1:9000")
+ private String hdfsPath;
+
+ @ApiModelProperty("Hive warehouse path, such as: /user/hive/warehouse/")
+ private String warehouse;
+
+ @ApiModelProperty("User and group information for writing data to HDFS")
+ private String hdfsUgi;
+
+ public HiveDataNodeRequest() {
+ this.setType(DataNodeType.HIVE);
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
new file mode 100644
index 000000000..d66ff1233
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/AbstractDataNodeOperator.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node;
+
+import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.transaction.annotation.Isolation;
+import org.springframework.transaction.annotation.Transactional;
+
+/**
+ * Default operation of data node.
+ */
+public abstract class AbstractDataNodeOperator implements DataNodeOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(AbstractDataNodeOperator.class);
+
+ @Autowired
+ protected DataNodeEntityMapper dataNodeEntityMapper;
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class)
+ public Integer saveOpt(DataNodeRequest request, String operator) {
+ DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
+ // set the ext params
+ this.setTargetEntity(request, entity);
+ entity.setCreator(operator);
+ entity.setModifier(operator);
+ dataNodeEntityMapper.insert(entity);
+
+ return entity.getId();
+ }
+
+ /**
+ * Set the parameters of the target entity.
+ *
+ * @param request data node request
+ * @param targetEntity entity which will set the new parameters
+ */
+ protected abstract void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity);
+
+ @Override
+ @Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
+ public void updateOpt(DataNodeRequest request, String operator) {
+ DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
+ // set the ext params
+ this.setTargetEntity(request, entity);
+ entity.setModifier(operator);
+ int rowCount = dataNodeEntityMapper.updateByIdSelective(entity);
+ if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+ LOGGER.error("data node has already updated with name={}, type={}, curVersion={}", request.getName(),
+ request.getType(), request.getVersion());
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ }
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
new file mode 100644
index 000000000..ce579994e
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperator.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node;
+
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+
+/**
+ * Interface of the data node operator.
+ */
+public interface DataNodeOperator {
+
+ /**
+ * Determines whether the current instance matches the specified type.
+ */
+ Boolean accept(String dataNodeType);
+
+ /**
+ * Get the data node type.
+ *
+ * @return data node type string
+ */
+ String getDataNodeType();
+
+ /**
+ * Save the data node info.
+ *
+ * @param request request of the data node
+ * @param operator name of the operator
+ * @return data node id after saving
+ */
+ Integer saveOpt(DataNodeRequest request, String operator);
+
+ /**
+ * Get the data node info from the given entity.
+ *
+ * @param entity get field value from the entity
+ * @return cluster info after encapsulating
+ */
+ DataNodeInfo getFromEntity(DataNodeEntity entity);
+
+ /**
+ * Update the data node info.
+ *
+ * @param request request of update
+ * @param operator name of operator
+ */
+ void updateOpt(DataNodeRequest request, String operator);
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperatorFactory.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperatorFactory.java
new file mode 100644
index 000000000..fba9d84d7
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperatorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Factory for {@link DataNodeOperator}.
+ */
+@Service
+public class DataNodeOperatorFactory {
+
+ @Autowired
+ private List<DataNodeOperator> dataNodeOperatorList;
+
+ /**
+ * Get a cluster operator instance via the given type
+ */
+ public DataNodeOperator getInstance(String type) {
+ return dataNodeOperatorList.stream()
+ .filter(inst -> inst.accept(type))
+ .findFirst()
+ .orElseThrow(() -> new BusinessException(
+ String.format(ErrorCodeEnum.DATA_NODE_TYPE_NOT_SUPPORTED.getMessage(), type)));
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataNodeService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
similarity index 91%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataNodeService.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
index 027fd1bf1..29422e81d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/DataNodeService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeService.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.manager.service.node;
import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
/**
* Data node service layer interface
@@ -42,7 +42,7 @@ public interface DataNodeService {
* @param id node id
* @return node info
*/
- DataNodeResponse get(Integer id);
+ DataNodeInfo get(Integer id);
/**
* Paging query nodes according to conditions.
@@ -50,7 +50,7 @@ public interface DataNodeService {
* @param request page request conditions
* @return node list
*/
- PageInfo<DataNodeResponse> list(DataNodePageRequest request);
+ PageInfo<DataNodeInfo> list(DataNodePageRequest request);
/**
* Update data node.
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
similarity index 83%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
index e87edc293..893987663 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeServiceImpl.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core.impl;
+package org.apache.inlong.manager.service.node;
import com.github.pagehelper.Page;
import com.github.pagehelper.PageHelper;
@@ -24,14 +24,12 @@ import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
-import org.apache.inlong.manager.service.core.DataNodeService;
import org.apache.inlong.manager.service.resource.sink.hive.HiveJdbcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +39,7 @@ import org.springframework.stereotype.Service;
import java.sql.Connection;
import java.util.List;
import java.util.Objects;
+import java.util.stream.Collectors;
/**
* Data node service layer implementation
@@ -52,6 +51,8 @@ public class DataNodeServiceImpl implements DataNodeService {
@Autowired
private DataNodeEntityMapper dataNodeMapper;
+ @Autowired
+ private DataNodeOperatorFactory operatorFactory;
@Override
public Integer save(DataNodeRequest request, String operator) {
@@ -65,33 +66,38 @@ public class DataNodeServiceImpl implements DataNodeService {
LOGGER.error(errMsg);
throw new BusinessException(errMsg);
}
- DataNodeEntity entity = CommonBeanUtils.copyProperties(request, DataNodeEntity::new);
- entity.setCreator(operator);
- entity.setModifier(operator);
- dataNodeMapper.insert(entity);
-
+ // according to the data type, save sink information
+ DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
+ int id = dataNodeOperator.saveOpt(request, operator);
LOGGER.debug("success to save data node={}", request);
- return entity.getId();
+ return id;
}
@Override
- public DataNodeResponse get(Integer id) {
+ public DataNodeInfo get(Integer id) {
DataNodeEntity entity = dataNodeMapper.selectById(id);
if (entity == null) {
LOGGER.error("data node not found by id={}", id);
throw new BusinessException("data node not found");
}
- DataNodeResponse response = CommonBeanUtils.copyProperties(entity, DataNodeResponse::new);
+
+ String dataNodeType = entity.getType();
+ DataNodeOperator dataNodeOperator = operatorFactory.getInstance(dataNodeType);
+ DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity);
LOGGER.debug("success to get data node info by id={}", id);
- return response;
+ return dataNodeInfo;
}
@Override
- public PageInfo<DataNodeResponse> list(DataNodePageRequest request) {
+ public PageInfo<DataNodeInfo> list(DataNodePageRequest request) {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<DataNodeEntity> entityPage = (Page<DataNodeEntity>) dataNodeMapper.selectByCondition(request);
- List<DataNodeResponse> responseList = CommonBeanUtils.copyListProperties(entityPage, DataNodeResponse::new);
- PageInfo<DataNodeResponse> page = new PageInfo<>(responseList);
+ List<DataNodeInfo> list = entityPage.stream()
+ .map(entity -> {
+ DataNodeOperator dataNodeOperator = operatorFactory.getInstance(entity.getType());
+ return dataNodeOperator.getFromEntity(entity);
+ }).collect(Collectors.toList());
+ PageInfo<DataNodeInfo> page = new PageInfo<>(list);
page.setTotal(entityPage.getTotal());
LOGGER.debug("success to list data node by {}", request);
return page;
@@ -101,9 +107,8 @@ public class DataNodeServiceImpl implements DataNodeService {
public Boolean update(DataNodeRequest request, String operator) {
String name = request.getName();
String type = request.getType();
-
- Integer id = request.getId();
DataNodeEntity exist = dataNodeMapper.selectByNameAndType(name, type);
+ Integer id = request.getId();
if (exist != null && !Objects.equals(id, exist.getId())) {
String errMsg = String.format("data node already exist for name=%s type=%s", name, type);
LOGGER.error(errMsg);
@@ -121,13 +126,8 @@ public class DataNodeServiceImpl implements DataNodeService {
LOGGER.error(errMsg);
throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
}
- CommonBeanUtils.copyProperties(request, entity, true);
- entity.setModifier(operator);
- int rowCount = dataNodeMapper.updateById(entity);
- if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- LOGGER.error(errMsg);
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
+ DataNodeOperator dataNodeOperator = operatorFactory.getInstance(request.getType());
+ dataNodeOperator.updateOpt(request, operator);
LOGGER.info("success to update data node={}", request);
return true;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
new file mode 100644
index 000000000..7cb25bd5f
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/hive/HiveDataNodeOperator.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node.hive;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeDTO;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
+import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+@Service
+public class HiveDataNodeOperator extends AbstractDataNodeOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(HiveDataNodeOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String dataNodeType) {
+ return getDataNodeType().equals(dataNodeType);
+ }
+
+ @Override
+ public String getDataNodeType() {
+ return DataNodeType.HIVE;
+ }
+
+ @Override
+ public DataNodeInfo getFromEntity(DataNodeEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
+ }
+
+ HiveDataNodeInfo hiveDataNodeInfo = new HiveDataNodeInfo();
+ CommonBeanUtils.copyProperties(entity, hiveDataNodeInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ HiveDataNodeDTO dto = HiveDataNodeDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, hiveDataNodeInfo);
+ }
+
+ LOGGER.debug("success to get hive data node from entity");
+ return hiveDataNodeInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) {
+ HiveDataNodeRequest hiveDataNodeRequest = (HiveDataNodeRequest) request;
+ CommonBeanUtils.copyProperties(hiveDataNodeRequest, targetEntity, true);
+ try {
+ HiveDataNodeDTO dto = HiveDataNodeDTO.getFromRequest(hiveDataNodeRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ LOGGER.debug("success to set entity for hive data node");
+ } catch (Exception e) {
+ LOGGER.error("failed to set entity for hive data node: ", e);
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
index a771b2d32..2e790fff1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/AbstractSinkOperator.java
@@ -26,15 +26,15 @@ import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.pojo.sink.SinkField;
-import org.apache.inlong.manager.pojo.sink.SinkRequest;
-import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.sink.SinkField;
+import org.apache.inlong.manager.pojo.sink.SinkRequest;
+import org.apache.inlong.manager.pojo.sink.StreamSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -186,6 +186,21 @@ public abstract class AbstractSinkOperator implements StreamSinkOperator {
LOGGER.info("success to save sink fields");
}
+ @Override
+ public void deleteOpt(StreamSinkEntity entity, String operator) {
+ entity.setPreviousStatus(entity.getStatus());
+ entity.setStatus(InlongConstants.DELETED_STATUS);
+ entity.setIsDeleted(entity.getId());
+ entity.setModifier(operator);
+ int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
+ if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
+ LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}",
+ entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion());
+ throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
+ }
+ sinkFieldMapper.logicDeleteAll(entity.getId());
+ }
+
/**
* Check the validity of sink fields.
*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
index 69e3b6c1c..e8231c419 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkOperator.java
@@ -89,4 +89,11 @@ public interface StreamSinkOperator {
*/
void updateFieldOpt(Boolean onlyAdd, SinkRequest request);
+ /**
+ * Delete the sink info.
+ *
+ * @param entity sink info needs to delete
+ * @param operator name of the operator
+ */
+ void deleteOpt(StreamSinkEntity entity, String operator);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 311fd9329..f5860fb9e 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -55,7 +55,6 @@ import org.springframework.transaction.annotation.Transactional;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -106,13 +105,13 @@ public class StreamSinkServiceImpl implements StreamSinkService {
}
// According to the sink type, save sink information
- StreamSinkOperator operation = operatorFactory.getInstance(request.getSinkType());
+ StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
List<SinkField> fields = request.getSinkFieldList();
// Remove id in sinkField when save
if (CollectionUtils.isNotEmpty(fields)) {
fields.forEach(sinkField -> sinkField.setId(null));
}
- int id = operation.saveOpt(request, operator);
+ int id = sinkOperator.saveOpt(request, operator);
LOGGER.info("success to save sink info: {}", request);
return id;
@@ -126,8 +125,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
LOGGER.error("sink not found by id={}", id);
throw new BusinessException(ErrorCodeEnum.SINK_INFO_NOT_FOUND);
}
- StreamSinkOperator operation = operatorFactory.getInstance(entity.getSinkType());
- StreamSink streamSink = operation.getFromEntity(entity);
+ StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
+ StreamSink streamSink = sinkOperator.getFromEntity(entity);
LOGGER.debug("success to get sink info by id={}", id);
return streamSink;
}
@@ -191,8 +190,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
}
List<StreamSink> responseList = Lists.newArrayList();
for (Map.Entry<String, Page<StreamSinkEntity>> entry : sinkMap.entrySet()) {
- StreamSinkOperator operation = operatorFactory.getInstance(entry.getKey());
- PageInfo<? extends StreamSink> pageInfo = operation.getPageInfo(entry.getValue());
+ StreamSinkOperator sinkOperator = operatorFactory.getInstance(entry.getKey());
+ PageInfo<? extends StreamSink> pageInfo = sinkOperator.getPageInfo(entry.getValue());
responseList.addAll(pageInfo.getList());
}
// Encapsulate the paging query results into the PageInfo object to obtain related paging information
@@ -230,8 +229,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
fields.forEach(sinkField -> sinkField.setId(null));
}
- StreamSinkOperator operation = operatorFactory.getInstance(request.getSinkType());
- operation.updateOpt(request, operator);
+ StreamSinkOperator sinkOperator = operatorFactory.getInstance(request.getSinkType());
+ sinkOperator.updateOpt(request, operator);
// The inlong group status is [Configuration successful], then asynchronously initiate
// the [Single inlong stream resource creation] workflow
@@ -266,20 +265,8 @@ public class StreamSinkServiceImpl implements StreamSinkService {
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
-
- entity.setPreviousStatus(entity.getStatus());
- entity.setStatus(InlongConstants.DELETED_STATUS);
- entity.setIsDeleted(id);
- entity.setModifier(operator);
- entity.setModifyTime(new Date());
- int rowCount = sinkMapper.updateByPrimaryKeySelective(entity);
- if (rowCount != InlongConstants.AFFECTED_ONE_ROW) {
- LOGGER.error("sink has already updated with groupId={}, streamId={}, name={}, curVersion={}",
- entity.getInlongGroupId(), entity.getInlongStreamId(), entity.getSinkName(), entity.getVersion());
- throw new BusinessException(ErrorCodeEnum.CONFIG_EXPIRED);
- }
- sinkFieldMapper.logicDeleteAll(id);
-
+ StreamSinkOperator sinkOperator = operatorFactory.getInstance(entity.getSinkType());
+ sinkOperator.deleteOpt(entity, operator);
LOGGER.info("success to delete sink info: {}", entity);
return true;
}
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
index a35aef416..41936336b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/DataNodeServiceTest.java
@@ -18,11 +18,12 @@
package org.apache.inlong.manager.service.core.impl;
import com.github.pagehelper.PageInfo;
+import org.apache.inlong.manager.common.consts.DataNodeType;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
import org.apache.inlong.manager.service.ServiceBaseTest;
-import org.apache.inlong.manager.service.core.DataNodeService;
+import org.apache.inlong.manager.service.node.DataNodeService;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -39,7 +40,7 @@ public class DataNodeServiceTest extends ServiceBaseTest {
* Save data node info.
*/
public Integer saveOpt(String nodeName, String type, String url, String username, String password) {
- DataNodeRequest request = new DataNodeRequest();
+ HiveDataNodeRequest request = new HiveDataNodeRequest();
request.setName(nodeName);
request.setType(type);
request.setUrl(url);
@@ -47,13 +48,15 @@ public class DataNodeServiceTest extends ServiceBaseTest {
request.setToken(password);
request.setDescription("test cluster");
request.setInCharges(GLOBAL_OPERATOR);
+ request.setJdbcUrl("127.0.0.1");
+ request.setToken("123456");
return dataNodeService.save(request, GLOBAL_OPERATOR);
}
/**
* Get data node list info.
*/
- public PageInfo<DataNodeResponse> listOpt(String type, String name) {
+ public PageInfo<DataNodeInfo> listOpt(String type, String name) {
DataNodePageRequest request = new DataNodePageRequest();
request.setType(type);
request.setName(name);
@@ -65,7 +68,7 @@ public class DataNodeServiceTest extends ServiceBaseTest {
*/
public Boolean updateOpt(Integer id, String nodeName, String type, String url, String username, String password,
Integer version) {
- DataNodeRequest request = new DataNodeRequest();
+ HiveDataNodeRequest request = new HiveDataNodeRequest();
request.setId(id);
request.setName(nodeName);
request.setType(type);
@@ -86,7 +89,7 @@ public class DataNodeServiceTest extends ServiceBaseTest {
@Test
public void testDataService() {
String nodeName = "hiveNode1";
- String type = "HIVE";
+ String type = DataNodeType.HIVE;
String url = "127.0.0.1:8080";
String usename = "admin";
String password = "123";
@@ -96,17 +99,17 @@ public class DataNodeServiceTest extends ServiceBaseTest {
Assertions.assertNotNull(id);
// test get data node
- DataNodeResponse nodeResponse = dataNodeService.get(id);
- Assertions.assertNotNull(nodeResponse);
- Assertions.assertEquals(type, nodeResponse.getType());
+ DataNodeInfo dataNodeInfo = dataNodeService.get(id);
+ Assertions.assertNotNull(dataNodeInfo);
+ Assertions.assertEquals(type, dataNodeInfo.getType());
// test get data node list
- PageInfo<DataNodeResponse> listDataNode = this.listOpt(type, nodeName);
+ PageInfo<DataNodeInfo> listDataNode = this.listOpt(type, nodeName);
Assertions.assertEquals(listDataNode.getTotal(), 1);
// test update data node
- String newNodeName = "kafkaNode1";
- String newType = "KAFKA";
+ String newNodeName = "hiveNode2";
+ String newType = DataNodeType.HIVE;
String newUrl = "127.0.0.1:8083";
String newUsername = "admin2";
String newPassword = "456";
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
index a8c8a81bd..802181c00 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/DataNodeController.java
@@ -22,13 +22,13 @@ import io.swagger.annotations.Api;
import io.swagger.annotations.ApiImplicitParam;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.manager.common.enums.OperationType;
-import org.apache.inlong.manager.pojo.common.Response;
import org.apache.inlong.manager.common.validation.UpdateValidation;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodePageRequest;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
import org.apache.inlong.manager.pojo.user.UserRoleCode;
-import org.apache.inlong.manager.service.core.DataNodeService;
+import org.apache.inlong.manager.service.node.DataNodeService;
import org.apache.inlong.manager.service.operationlog.OperationLog;
import org.apache.inlong.manager.service.user.LoginUserUtils;
import org.apache.shiro.authz.annotation.RequiresRoles;
@@ -65,13 +65,13 @@ public class DataNodeController {
@GetMapping(value = "/node/get/{id}")
@ApiOperation(value = "Get node by id")
@ApiImplicitParam(name = "id", value = "Data node ID", dataTypeClass = Integer.class, required = true)
- public Response<DataNodeResponse> get(@PathVariable Integer id) {
+ public Response<DataNodeInfo> get(@PathVariable Integer id) {
return Response.success(dataNodeService.get(id));
}
@PostMapping(value = "/node/list")
@ApiOperation(value = "List data node")
- public Response<PageInfo<DataNodeResponse>> list(@RequestBody DataNodePageRequest request) {
+ public Response<PageInfo<DataNodeInfo>> list(@RequestBody DataNodePageRequest request) {
return Response.success(dataNodeService.list(request));
}
diff --git a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
index 99223394f..ba7b10353 100644
--- a/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
+++ b/inlong-manager/manager-web/src/test/java/org/apache/inlong/manager/web/controller/DataNodeControllerTest.java
@@ -17,12 +17,14 @@
package org.apache.inlong.manager.web.controller;
-import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.consts.InlongConstants;
-import org.apache.inlong.manager.pojo.node.DataNodeRequest;
-import org.apache.inlong.manager.pojo.node.DataNodeResponse;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.common.Response;
+import org.apache.inlong.manager.pojo.node.DataNodeRequest;
+import org.apache.inlong.manager.pojo.node.DataNodeResponse;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeRequest;
import org.apache.inlong.manager.web.WebBaseTest;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -36,15 +38,15 @@ class DataNodeControllerTest extends WebBaseTest {
@Resource
DataNodeEntityMapper dataNodeEntityMapper;
- DataNodeRequest getDataNodeRequest() {
- return DataNodeRequest.builder()
- .name("hiveNode1")
- .type("HIVE")
- .url("127.0.0.1:8080")
- .username("admin")
- .token("123")
- .inCharges("admin")
- .build();
+ HiveDataNodeRequest getHiveDataNodeRequest() {
+ HiveDataNodeRequest hiveDataNodeRequest = new HiveDataNodeRequest();
+ hiveDataNodeRequest.setName("hiveNode1");
+ hiveDataNodeRequest.setType(DataNodeType.HIVE);
+ hiveDataNodeRequest.setUrl("127.0.0.1:8080");
+ hiveDataNodeRequest.setUsername("admin");
+ hiveDataNodeRequest.setToken("123");
+ hiveDataNodeRequest.setInCharges("admin");
+ return hiveDataNodeRequest;
}
@Test
@@ -52,7 +54,7 @@ class DataNodeControllerTest extends WebBaseTest {
logout();
operatorLogin();
- MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getDataNodeRequest());
+ MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getHiveDataNodeRequest());
Response<Integer> response = getResBody(mvcResult, Integer.class);
Assertions.assertEquals("Current user [operator] has no permission to access URL", response.getErrMsg());
@@ -61,7 +63,7 @@ class DataNodeControllerTest extends WebBaseTest {
@Test
void testSaveAndGetAndDelete() throws Exception {
// save
- MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getDataNodeRequest());
+ MvcResult mvcResult = postForSuccessMvcResult("/api/node/save", getHiveDataNodeRequest());
Integer dataNodeId = getResBodyObj(mvcResult, Integer.class);
Assertions.assertNotNull(dataNodeId);
@@ -71,7 +73,7 @@ class DataNodeControllerTest extends WebBaseTest {
DataNodeResponse dataNode = getResBodyObj(getResult, DataNodeResponse.class);
Assertions.assertNotNull(dataNode);
- Assertions.assertEquals(getDataNodeRequest().getName(), dataNode.getName());
+ Assertions.assertEquals(getHiveDataNodeRequest().getName(), dataNode.getName());
// delete
MvcResult deleteResult = deleteForSuccessMvcResult("/api/node/delete/{id}", dataNodeId);
@@ -99,7 +101,7 @@ class DataNodeControllerTest extends WebBaseTest {
dataNodeEntityMapper.insert(nodeEntity);
- DataNodeRequest request = getDataNodeRequest();
+ DataNodeRequest request = getHiveDataNodeRequest();
request.setId(nodeEntity.getId());
request.setName("test447777");
request.setVersion(nodeEntity.getVersion());
@@ -114,7 +116,7 @@ class DataNodeControllerTest extends WebBaseTest {
@Test
void testUpdateFailByNoId() throws Exception {
- MvcResult mvcResult = postForSuccessMvcResult("/api/node/update", getDataNodeRequest());
+ MvcResult mvcResult = postForSuccessMvcResult("/api/node/update", getHiveDataNodeRequest());
Response<Boolean> response = getResBody(mvcResult, Boolean.class);
Assertions.assertFalse(response.isSuccess());