You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/27 12:15:07 UTC
[inlong] branch master updated: [INLONG-6018][Manager] Using the Hive data node when creating group resources (#6020)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new fcc6ee24a [INLONG-6018][Manager] Using the Hive data node when creating group resources (#6020)
fcc6ee24a is described below
commit fcc6ee24a70364d5a232dc782112f1f6917dccb4
Author: woofyzhao <49...@qq.com>
AuthorDate: Tue Sep 27 20:15:01 2022 +0800
[INLONG-6018][Manager] Using the Hive data node when creating group resources (#6020)
---
.../resources/mappers/StreamSinkEntityMapper.xml | 1 +
.../manager/pojo/node/hive/HiveDataNodeInfo.java | 3 --
.../apache/inlong/manager/pojo/sink/SinkInfo.java | 1 +
.../service/node/DataNodeOperateHelper.java | 54 ++++++++++++++++++++++
.../resource/sink/hive/HiveResourceOperator.java | 31 ++++++++++++-
5 files changed, 86 insertions(+), 4 deletions(-)
diff --git a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
index bd9b72330..5d1d3ea25 100644
--- a/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
+++ b/inlong-manager/manager-dao/src/main/resources/mappers/StreamSinkEntityMapper.xml
@@ -324,6 +324,7 @@
sink.status,
sink.creator,
sink.sink_name,
+ sink.data_node_name,
stream.mq_resource,
stream.data_type,
stream.data_separator as sourceSeparator,
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
index 34b50ed06..8602762e4 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/node/hive/HiveDataNodeInfo.java
@@ -37,9 +37,6 @@ import org.apache.inlong.manager.pojo.node.DataNodeInfo;
@ApiModel("Hive data node info")
public class HiveDataNodeInfo extends DataNodeInfo {
- @ApiModelProperty("Hive JDBC URL, such as jdbc:hive2://${ip}:${port}")
- private String jdbcUrl;
-
@ApiModelProperty("Version for Hive, such as: 3.2.1")
private String hiveVersion;
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
index 916ae199e..c2d2c3fff 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sink/SinkInfo.java
@@ -32,6 +32,7 @@ public class SinkInfo {
private String inlongStreamId;
private String sinkType;
private String sinkName;
+ private String dataNodeName;
private String description;
private Integer enableCreateResource;
private String extParams;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java
new file mode 100644
index 000000000..a77af7b66
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/node/DataNodeOperateHelper.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.node;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.dao.entity.DataNodeEntity;
+import org.apache.inlong.manager.dao.mapper.DataNodeEntityMapper;
+import org.apache.inlong.manager.pojo.node.DataNodeInfo;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Data node helper service
+ */
+@Slf4j
+@Service
+public class DataNodeOperateHelper {
+
+ @Autowired
+ private DataNodeEntityMapper dataNodeMapper;
+ @Autowired
+ private DataNodeOperatorFactory operatorFactory;
+
+ /**
+ * Get data node info by name and type
+ */
+ public DataNodeInfo getDataNodeInfo(String dataNodeName, String dataNodeType) {
+ DataNodeEntity entity = dataNodeMapper.selectByNameAndType(dataNodeName, dataNodeType);
+ if (entity == null) {
+ log.error("data node not found by name={}, type={}", dataNodeName, dataNodeType);
+ throw new BusinessException("data node not found");
+ }
+ DataNodeOperator dataNodeOperator = operatorFactory.getInstance(dataNodeType);
+ DataNodeInfo dataNodeInfo = dataNodeOperator.getFromEntity(entity);
+ log.debug("success to get data node info by name={}, type={}", dataNodeName, dataNodeType);
+ return dataNodeInfo;
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
index c67559140..9542c7e03 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sink/hive/HiveResourceOperator.java
@@ -18,16 +18,21 @@
package org.apache.inlong.manager.service.resource.sink.hive;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SinkType;
import org.apache.inlong.manager.common.enums.SinkStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.StreamSinkFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
+import org.apache.inlong.manager.pojo.node.hive.HiveDataNodeInfo;
import org.apache.inlong.manager.pojo.sink.SinkInfo;
import org.apache.inlong.manager.pojo.sink.hive.HiveColumnInfo;
import org.apache.inlong.manager.pojo.sink.hive.HiveSinkDTO;
import org.apache.inlong.manager.pojo.sink.hive.HiveTableInfo;
+import org.apache.inlong.manager.service.node.DataNodeOperateHelper;
import org.apache.inlong.manager.service.resource.sink.SinkResourceOperator;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.slf4j.Logger;
@@ -51,6 +56,8 @@ public class HiveResourceOperator implements SinkResourceOperator {
private StreamSinkService sinkService;
@Autowired
private StreamSinkFieldEntityMapper sinkFieldMapper;
+ @Autowired
+ private DataNodeOperateHelper dataNodeHelper;
@Override
public Boolean accept(String sinkType) {
@@ -78,6 +85,28 @@ public class HiveResourceOperator implements SinkResourceOperator {
this.createTable(sinkInfo);
}
+ private HiveSinkDTO getHiveInfo(SinkInfo sinkInfo) {
+ HiveSinkDTO hiveInfo = new HiveSinkDTO();
+
+ if (StringUtils.isNotBlank(sinkInfo.getExtParams())) {
+ HiveSinkDTO userSinkInfo = HiveSinkDTO.getFromJson(sinkInfo.getExtParams());
+ CommonBeanUtils.copyProperties(userSinkInfo, hiveInfo);
+ }
+
+ // read from data node if not supplied by user
+ if (StringUtils.isBlank(hiveInfo.getJdbcUrl())) {
+ String dataNodeName = sinkInfo.getDataNodeName();
+ Preconditions.checkNotEmpty(dataNodeName, "hive jdbc url not specified and data node is empty");
+ HiveDataNodeInfo dataNodeInfo = (HiveDataNodeInfo) dataNodeHelper.getDataNodeInfo(
+ dataNodeName, sinkInfo.getSinkType());
+ CommonBeanUtils.copyProperties(dataNodeInfo, hiveInfo);
+ hiveInfo.setJdbcUrl(dataNodeInfo.getUrl());
+ hiveInfo.setUsername(dataNodeInfo.getUsername());
+ hiveInfo.setPassword(dataNodeInfo.getToken());
+ }
+ return hiveInfo;
+ }
+
private void createTable(SinkInfo sinkInfo) {
LOGGER.info("begin to create hive table for sinkId={}", sinkInfo.getId());
@@ -97,7 +126,7 @@ public class HiveResourceOperator implements SinkResourceOperator {
}
try {
- HiveSinkDTO hiveInfo = HiveSinkDTO.getFromJson(sinkInfo.getExtParams());
+ HiveSinkDTO hiveInfo = this.getHiveInfo(sinkInfo);
HiveTableInfo tableInfo = HiveSinkDTO.getHiveTableInfo(hiveInfo, columnList);
String url = hiveInfo.getJdbcUrl();
String user = hiveInfo.getUsername();